77import sys
88
99from time import time , sleep
10- from pandas .core .frame import DataFrame
1110from pandas .core .series import Series
1211from pathlib import Path
1312from psycopg2 import OperationalError
2322CSV_DIR = os .getenv ('CSV_DIR' )
2423TABLES_META = os .getenv ('TABLES_META' )
2524
25+
2626def simplify_name (fname : str ) -> str :
2727 fname = re .sub ('\.csv$' , '' , fname )
2828 fname = re .sub ('[^a-zA-Z0-9]' , '_' , fname )
2929 return fname .lower ()
3030
31+
3132def copy_expression (tname : str , fields : Series ) -> str :
3233 tfields = list ()
3334 for field , ftype in fields .items ():
3435 tfield = simplify_name (field )
3536 if ftype == 'bool' :
36- tfields .append (f'case when "{ tfield } " = true then \' True\' else \' False\' end as "{ tfield } "' )
37+ tfields .append (
38+ f'case when "{ tfield } " = true then \' TRUE\' '
39+ f' else \' FALSE\' end as "{ tfield } "'
40+ )
41+ elif ftype == 'float64' :
42+ tfields .append (
43+ f'case when "{ tfield } "::text not like \' %.%\' '
44+ f' then "{ tfield } "::text||\' .0\' '
45+ f' else "{ tfield } "::text end as "{ tfield } "'
46+ )
3747 else :
3848 tfields .append (f'"{ tfield } "' )
3949
4050 tfields = ',' .join (tfields )
4151 return f"(select { tfields } from { tname } order by id)"
4252
53+
4354def create_table (conn : connection , fname : str , fields : Series ) -> dict :
4455 tname = simplify_name (fname )
4556 meta = {
@@ -48,6 +59,7 @@ def create_table(conn: connection, fname: str, fields: Series) -> dict:
4859 'filename' : fname ,
4960 'fields' : [],
5061 'tablefields' : [],
62+ 'remove_newline' : True ,
5163 }
5264
5365 tfields = list ()
@@ -61,9 +73,13 @@ def create_table(conn: connection, fname: str, fields: Series) -> dict:
6173
6274 tfields .append (f'"{ tfield } " { DATA_TYPES [ftype .name ]} ' )
6375
64- conn .cursor ().execute (f"create table { tname } (id serial, { ',' .join (tfields )} )" )
76+ conn .cursor ().execute (
77+ f"create table { tname } "
78+ f" (id serial, { ',' .join (tfields )} )"
79+ )
6580 return meta
6681
82+
6783def load (conn : connection , csv_file : Path ) -> dict :
6884 df = pd .read_csv (csv_file , iterator = True , chunksize = 10000 )
6985
@@ -76,7 +92,14 @@ def load(conn: connection, csv_file: Path) -> dict:
7692
7793 fields = ',' .join ([f'"{ f } "' for f in meta ['tablefields' ]])
7894 with csv_file .open ('r' ) as f :
79- conn .cursor ().copy_expert (f"copy { meta ['tablename' ]} ({ fields } ) from stdin with header csv" , f )
95+ conn .cursor ().copy_expert (
96+ f"copy { meta ['tablename' ]} ({ fields } )"
97+ f" from stdin with header csv" , f
98+ )
99+ with csv_file .open ('rb' ) as f :
100+ f .seek (- 1 , os .SEEK_END )
101+ if f .read (1 ) == b'\n ' :
102+ meta ['remove_newline' ] = False
80103
81104 return meta
82105
@@ -86,7 +109,8 @@ def load(conn: connection, csv_file: Path) -> dict:
86109 parser .add_argument (
87110 "--just-compile" ,
88111 action = "store_true" ,
89- help = "Don't do actual load but just compile the script to speedup the first load" ,
112+ help = "Don't do actual load but just compile"
113+ " the script to speedup the first load" ,
90114 )
91115 args = parser .parse_args ()
92116 if args .just_compile :
0 commit comments