11import datetime
22import gzip
3+ import json
34import os
45import tempfile
56from typing import List
67
78import pandas as pd
9+ from pandas .api .types import is_numeric_dtype
810from psycopg2 import sql
911
1012from mapswipe_workers import auth
@@ -28,6 +30,30 @@ def add_metadata_to_csv(filename: str):
2830 logger .info (f"added metadata to { filename } ." )
2931
3032
33+ def normalize_project_type_specifics (path ):
34+ df = pd .read_csv (path )
35+
36+ if "project_type_specifics" in df .columns .tolist () and not is_numeric_dtype (
37+ df ["project_type_specifics" ]
38+ ):
39+ df ["project_type_specifics" ] = df ["project_type_specifics" ].map (json .loads )
40+
41+ df = df .reset_index ()
42+ normalized = pd .json_normalize (df ["project_type_specifics" ])
43+ normalized .index = df .index
44+ df = pd .concat ([df , normalized ], axis = 1 ).drop (
45+ columns = ["project_type_specifics" ]
46+ )
47+ for column in list (normalized .columns ):
48+ if "properties" in column :
49+ df .rename (
50+ columns = {column : column .replace ("properties." , "" )}, inplace = True
51+ )
52+
53+ df .dropna (inplace = True , axis = 0 )
54+ df .to_csv (path )
55+
56+
3157def write_sql_to_gzipped_csv (filename : str , sql_query : sql .SQL ):
3258 """
3359 Use the copy statement to write data from postgres to a csv file.
@@ -39,6 +65,8 @@ def write_sql_to_gzipped_csv(filename: str, sql_query: sql.SQL):
3965 with open (tmp_csv_file , "w" ) as f :
4066 pg_db .copy_expert (sql_query , f )
4167
68+ normalize_project_type_specifics (tmp_csv_file )
69+
4270 with open (tmp_csv_file , "rb" ) as f_in , gzip .open (filename , "wb" ) as f_out :
4371 f_out .writelines (f_in )
4472
@@ -118,19 +146,12 @@ def get_tasks(filename: str, project_id: str) -> pd.DataFrame:
118146
119147 sql_query = sql .SQL (
120148 """
121- COPY (
122- SELECT project_id, group_id, task_id, ST_AsText(geom) as geom,
123- (project_type_specifics->'properties'->'osmId')::text as osmId,
124- (project_type_specifics->'properties'->'changesetId')::text::int as changesetId,
125- (project_type_specifics->'properties'->'version')::text::smallint as version,
126- (project_type_specifics->'properties'->'userid')::text::int as userid,
127- (project_type_specifics->'properties'->'username')::text as username,
128- (project_type_specifics->'properties'->'editor')::text as editor,
129- (project_type_specifics->'properties'->'comment')::text as comment,
130- (project_type_specifics->'properties'->'lastEdit')::text::timestamp as lastEdit
131- FROM tasks
132- WHERE project_id = {}
133- ) TO STDOUT WITH CSV HEADER
149+ COPY (
150+ SELECT project_id, group_id, task_id, ST_AsText(geom) as geom,
151+ project_type_specifics
152+ FROM tasks
153+ WHERE project_id = {}
154+ ) TO STDOUT WITH CSV HEADER
134155 """
135156 ).format (sql .Literal (project_id ))
136157 write_sql_to_gzipped_csv (filename , sql_query )
@@ -311,23 +332,9 @@ def get_agg_results_by_task_id(
311332 )
312333
313334 # add task geometry using left join
335+ tasks_df .drop (columns = ["project_id" , "group_id" ], inplace = True )
314336 agg_results_df = results_by_task_id_df .merge (
315- tasks_df [
316- [
317- "geom" ,
318- "task_id" ,
319- "osmid" ,
320- "changesetid" ,
321- "version" ,
322- "userid" ,
323- "username" ,
324- "editor" ,
325- "comment" ,
326- "lastedit" ,
327- ]
328- ],
329- left_on = "task_id" ,
330- right_on = "task_id" ,
337+ tasks_df , left_on = "task_id" , right_on = "task_id" ,
331338 )
332339 logger .info ("added geometry to aggregated results" )
333340
0 commit comments