66import csv
77from datetime import datetime
88
9- import pandas
10- import pyarrow
11- from pyarrow import parquet
9+ import pandas as pd
10+ import pyarrow as pa
11+ from pyarrow import parquet as pq
1212
1313from awswrangler import data_types
1414from awswrangler .exceptions import (UnsupportedWriteMode ,
@@ -239,21 +239,20 @@ def _read_csv_iterator(
239239 lineterminator = lineterminator )
240240 forgotten_bytes = len (body [last_char :])
241241
242- df = pandas .read_csv (
243- StringIO (body [:last_char ].decode ("utf-8" )),
244- header = header ,
245- names = names ,
246- usecols = usecols ,
247- sep = sep ,
248- quotechar = quotechar ,
249- quoting = quoting ,
250- escapechar = escapechar ,
251- parse_dates = parse_dates ,
252- infer_datetime_format = infer_datetime_format ,
253- lineterminator = lineterminator ,
254- dtype = dtype ,
255- encoding = encoding ,
256- converters = converters )
242+ df = pd .read_csv (StringIO (body [:last_char ].decode ("utf-8" )),
243+ header = header ,
244+ names = names ,
245+ usecols = usecols ,
246+ sep = sep ,
247+ quotechar = quotechar ,
248+ quoting = quoting ,
249+ escapechar = escapechar ,
250+ parse_dates = parse_dates ,
251+ infer_datetime_format = infer_datetime_format ,
252+ lineterminator = lineterminator ,
253+ dtype = dtype ,
254+ encoding = encoding ,
255+ converters = converters )
257256 yield df
258257 if count == 1 : # first chunk
259258 names = df .columns
@@ -402,7 +401,7 @@ def _read_csv_once(
402401 Key = key_path ,
403402 Fileobj = buff )
404403 buff .seek (0 ),
405- dataframe = pandas .read_csv (
404+ dataframe = pd .read_csv (
406405 buff ,
407406 header = header ,
408407 names = names ,
@@ -822,7 +821,7 @@ def _data_to_s3_object_writer(dataframe,
822821 extra_args = None ,
823822 isolated_dataframe = False ):
824823 fs = s3 .get_fs (session_primitives = session_primitives )
825- fs = pyarrow .filesystem ._ensure_filesystem (fs )
824+ fs = pa .filesystem ._ensure_filesystem (fs )
826825 s3 .mkdir_if_not_exists (fs , path )
827826
828827 if compression is None :
@@ -834,7 +833,7 @@ def _data_to_s3_object_writer(dataframe,
834833 else :
835834 raise InvalidCompression (compression )
836835
837- guid = pyarrow .compat .guid ()
836+ guid = pa .compat .guid ()
838837 if file_format == "parquet" :
839838 outfile = f"{ guid } .parquet{ compression_end } "
840839 elif file_format == "csv" :
@@ -905,9 +904,9 @@ def write_parquet_dataframe(dataframe, path, preserve_index, compression,
905904 logger .debug (f"Casting column { name } Int64 to float64" )
906905
907906 # Converting Pandas Dataframe to Pyarrow's Table
908- table = pyarrow .Table .from_pandas (df = dataframe ,
909- preserve_index = preserve_index ,
910- safe = False )
907+ table = pa .Table .from_pandas (df = dataframe ,
908+ preserve_index = preserve_index ,
909+ safe = False )
911910
912911 # Casting on Pyarrow
913912 if cast_columns :
@@ -923,11 +922,11 @@ def write_parquet_dataframe(dataframe, path, preserve_index, compression,
923922
924923 # Persisting on S3
925924 with fs .open (path , "wb" ) as f :
926- parquet .write_table (table ,
927- f ,
928- compression = compression ,
929- coerce_timestamps = "ms" ,
930- flavor = "spark" )
925+ pq .write_table (table ,
926+ f ,
927+ compression = compression ,
928+ coerce_timestamps = "ms" ,
929+ flavor = "spark" )
931930
932931 # Casting back on Pandas if necessary
933932 if isolated_dataframe is False :
@@ -1047,7 +1046,7 @@ def read_log_query(self,
10471046 col_name = col ["field" ]
10481047 new_row [col_name ] = col ["value" ]
10491048 pre_df .append (new_row )
1050- return pandas .DataFrame (pre_df )
1049+ return pd .DataFrame (pre_df )
10511050
10521051 @staticmethod
10531052 def normalize_columns_names_athena (dataframe , inplace = True ):
0 commit comments