11import datetime
22import os
33from typing import List
4+ import gzip
5+ import tempfile
46
57import pandas as pd
68from psycopg2 import sql
@@ -26,15 +28,21 @@ def add_metadata_to_csv(filename: str):
2628 logger .info (f"added metadata to { filename } ." )
2729
2830
29- def write_sql_to_csv (filename : str , sql_query : sql .SQL ):
31+ def write_sql_to_gzipped_csv (filename : str , sql_query : sql .SQL ):
3032 """
3133 Use the copy statement to write data from postgres to a csv file.
3234 """
3335
36+ # generate temporary file which will be automatically deleted at the end
37+ tmp_csv_file = os .path .join (tempfile ._get_default_tempdir (), 'tmp.csv' )
3438 pg_db = auth .postgresDB ()
35- with open (filename , "w" ) as f :
39+ with open (tmp_csv_file , "w" ) as f :
3640 pg_db .copy_expert (sql_query , f )
37- logger .info (f"wrote csv file from sql: { filename } " )
41+
42+ with open (tmp_csv_file , 'rb' ) as f_in , gzip .open (filename , 'wb' ) as f_out :
43+ f_out .writelines (f_in )
44+
45+ logger .info (f"wrote gzipped csv file from sql: { filename } " )
3846
3947
4048def load_df_from_csv (filename : str ) -> pd .DataFrame :
@@ -44,7 +52,11 @@ def load_df_from_csv(filename: str) -> pd.DataFrame:
4452 """
4553 dtype_dict = {"project_id" : str , "group_id" : str , "task_id" : str }
4654
47- df = pd .read_csv (filename , dtype = dtype_dict )
55+ df = pd .read_csv (
56+ filename ,
57+ dtype = dtype_dict ,
58+ compression = "gzip"
59+ )
4860 logger .info (f"loaded pandas df from { filename } " )
4961 return df
5062
@@ -73,7 +85,7 @@ def get_results(filename: str, project_id: str) -> pd.DataFrame:
7385 ) TO STDOUT WITH CSV HEADER
7486 """
7587 ).format (sql .Literal (project_id ))
76- write_sql_to_csv (filename , sql_query )
88+ write_sql_to_gzipped_csv (filename , sql_query )
7789
7890 df = load_df_from_csv (filename )
7991
@@ -117,7 +129,7 @@ def get_tasks(filename: str, project_id: str) -> pd.DataFrame:
117129 ) TO STDOUT WITH CSV HEADER
118130 """
119131 ).format (sql .Literal (project_id ))
120- write_sql_to_csv (filename , sql_query )
132+ write_sql_to_gzipped_csv (filename , sql_query )
121133
122134 df = load_df_from_csv (filename )
123135 return df
@@ -152,7 +164,7 @@ def get_groups(filename: str, project_id: str) -> pd.DataFrame:
152164 ) TO STDOUT WITH CSV HEADER
153165 """
154166 ).format (sql .Literal (project_id ))
155- write_sql_to_csv (filename , sql_query )
167+ write_sql_to_gzipped_csv (filename , sql_query )
156168
157169 df = load_df_from_csv (filename )
158170 return df
@@ -322,11 +334,11 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
322334 """
323335
324336 # set filenames
325- results_filename = f"{ DATA_PATH } /api/results/results_{ project_id } .csv"
326- tasks_filename = f"{ DATA_PATH } /api/tasks/tasks_{ project_id } .csv"
327- groups_filename = f"{ DATA_PATH } /api/groups/groups_{ project_id } .csv"
328- agg_results_filename = f"{ DATA_PATH } /api/agg_results/agg_results_{ project_id } .csv"
329- agg_results_by_user_id_filename = f"{ DATA_PATH } /api/users/users_{ project_id } .csv"
337+ results_filename = f"{ DATA_PATH } /api/results/results_{ project_id } .csv.gz "
338+ tasks_filename = f"{ DATA_PATH } /api/tasks/tasks_{ project_id } .csv.gz "
339+ groups_filename = f"{ DATA_PATH } /api/groups/groups_{ project_id } .csv.gz "
340+ agg_results_filename = f"{ DATA_PATH } /api/agg_results/agg_results_{ project_id } .csv.gz "
341+ agg_results_by_user_id_filename = f"{ DATA_PATH } /api/users/users_{ project_id } .csv.gz "
330342 project_stats_by_date_filename = f"{ DATA_PATH } /api/history/history_{ project_id } .csv"
331343
332344 # load data from postgres or local storage if already downloaded
@@ -339,11 +351,24 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
339351 groups_df = get_groups (groups_filename , project_id )
340352 tasks_df = get_tasks (tasks_filename , project_id )
341353
354+ if any ("maxar" in s for s in project_info ["tile_server_names" ]):
355+ add_metadata = True
356+ else :
357+ add_metadata = False
358+
342359 # aggregate results by task id
343360 agg_results_df = get_agg_results_by_task_id (results_df , tasks_df )
344- agg_results_df .to_csv (agg_results_filename , index_label = "idx" )
361+ agg_results_df .to_csv (
362+ agg_results_filename ,
363+ index_label = "idx"
364+ )
365+
366+ geojson_functions .gzipped_csv_to_gzipped_geojson (
367+ filename = agg_results_filename ,
368+ geometry_field = "geom" ,
369+ add_metadata = add_metadata
370+ )
345371 logger .info (f"saved agg results for { project_id } : { agg_results_filename } " )
346- geojson_functions .csv_to_geojson (agg_results_filename , "geom" )
347372
348373 # aggregate results by user id
349374 # TODO: solve memory issue for agg results by user id
@@ -352,7 +377,8 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
352377 results_df , agg_results_df
353378 )
354379 agg_results_by_user_id_df .to_csv (
355- agg_results_by_user_id_filename , index_label = "idx"
380+ agg_results_by_user_id_filename ,
381+ index_label = "idx"
356382 )
357383 logger .info (
358384 f"saved agg results for { project_id } : { agg_results_by_user_id_filename } "
@@ -361,10 +387,6 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
361387 sentry .capture_exception ()
362388 logger .info (f"failed to agg results by user id for { project_id } " )
363389
364- if any ("maxar" in s for s in project_info ["tile_server_names" ]):
365- add_metadata_to_csv (agg_results_filename )
366- geojson_functions .add_metadata_to_geojson (agg_results_filename )
367-
368390 project_stats_by_date_df = project_stats_by_date .get_project_history (
369391 results_df , groups_df
370392 )
@@ -380,7 +402,10 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
380402 # do not do this for ArbitraryGeometry / BuildingFootprint projects
381403 logger .info (f"do NOT generate tasking manager geometries for { project_id } " )
382404 else :
383- tasking_manager_geometries .generate_tasking_manager_geometries (project_id )
405+ tasking_manager_geometries .generate_tasking_manager_geometries (
406+ project_id = project_id ,
407+ agg_results_filename = agg_results_filename
408+ )
384409
385410 # prepare output of function
386411 project_stats_dict = {
0 commit comments