1- # flake8: noqa: F821
2-
31import base64
42import hashlib
53import hmac
1715from dateutil .relativedelta import *
1816from pyathena import connect
1917
20-
2118logging .basicConfig (level = logging .INFO )
2219logger = logging .getLogger (__name__ )
2320
@@ -115,7 +112,10 @@ def get_report_fromtime(report_id, timestamp_to_call, auth_headers, auth_payload
115112
116113
117114def dump_dataframe (response , location , filename ):
118- df = pd .DataFrame .from_dict (response .json (), orient = "columns" )
115+ df = pd .DataFrame .from_dict (
116+ response .json (),
117+ orient = "columns" ,
118+ )
119119
120120 df ["import_year" ] = datetime .today ().strftime ("%Y" )
121121 df ["import_month" ] = datetime .today ().strftime ("%m" )
@@ -125,6 +125,9 @@ def dump_dataframe(response, location, filename):
125125 print (f"Database: { target_database } " )
126126 print (f"Table: { target_table } " )
127127
128+ dict_values = ["string" for _ in range (len (df .columns ))]
129+ dtype_dict = dict (zip (df .columns , dict_values ))
130+
128131 # write to s3
129132 wr .s3 .to_parquet (
130133 df = df ,
@@ -134,6 +137,7 @@ def dump_dataframe(response, location, filename):
134137 table = target_table ,
135138 mode = "overwrite_partitions" ,
136139 partition_cols = partition_keys ,
140+ dtype = dtype_dict ,
137141 )
138142 print (f"Dumped Dataframe { df .shape } to { s3_target_location } " )
139143 logger .info (f"Dumped Dataframe { df .shape } to { s3_target_location } " )
0 commit comments