@@ -664,12 +664,14 @@ def _read_sql_athena_regular(self,
664664 dtype , parse_timestamps , parse_dates , converters = self ._get_query_dtype (
665665 query_execution_id = query_execution_id )
666666 path = f"{ s3_output } { query_execution_id } .csv"
667+ logger .debug ("Start reading..." )
667668 ret = self .read_csv (path = path ,
668669 dtype = dtype ,
669670 parse_dates = parse_timestamps ,
670671 converters = converters ,
671672 quoting = csv .QUOTE_ALL ,
672673 max_result_size = max_result_size )
674+ logger .debug ("Start type casting..." )
673675 if max_result_size is None :
674676 if len (ret .index ) > 0 :
675677 for col in parse_dates :
@@ -1129,7 +1131,6 @@ def write_csv_dataframe(dataframe, path, preserve_index, compression, fs, extra_
11291131 elif serde == "LazySimpleSerDe" :
11301132 csv_extra_args ["quoting" ] = csv .QUOTE_NONE
11311133 csv_extra_args ["escapechar" ] = "\\ "
1132- logger .debug (f"csv_extra_args: { csv_extra_args } " )
11331134 csv_buffer : bytes = bytes (
11341135 dataframe .to_csv (None , header = False , index = preserve_index , compression = compression , ** csv_extra_args ),
11351136 "utf-8" )
@@ -1360,19 +1361,19 @@ def read_parquet(self,
13601361 """
13611362 procs_cpu_bound = procs_cpu_bound if procs_cpu_bound is not None else self ._session .procs_cpu_bound if self ._session .procs_cpu_bound is not None else 1
13621363 logger .debug (f"procs_cpu_bound: { procs_cpu_bound } " )
1363- df : Optional [pd .DataFrame ] = None
1364+ dfs : List [pd .DataFrame ] = []
13641365 session_primitives = self ._session .primitives
13651366 path = [path ] if type (path ) == str else path # type: ignore
13661367 bounders = calculate_bounders (len (path ), procs_cpu_bound )
13671368 logger .debug (f"len(bounders): { len (bounders )} " )
13681369 if len (bounders ) == 1 :
1369- df = Pandas ._read_parquet_paths (session_primitives = session_primitives ,
1370- path = path ,
1371- columns = columns ,
1372- filters = filters ,
1373- procs_cpu_bound = procs_cpu_bound ,
1374- wait_objects = wait_objects ,
1375- wait_objects_timeout = wait_objects_timeout )
1370+ dfs = Pandas ._read_parquet_paths (session_primitives = session_primitives ,
1371+ path = path ,
1372+ columns = columns ,
1373+ filters = filters ,
1374+ procs_cpu_bound = procs_cpu_bound ,
1375+ wait_objects = wait_objects ,
1376+ wait_objects_timeout = wait_objects_timeout )
13761377 else :
13771378 procs = []
13781379 receive_pipes = []
@@ -1398,15 +1399,16 @@ def read_parquet(self,
13981399 logger .debug (f"len(procs): { len (bounders )} " )
13991400 for i in range (len (procs )):
14001401 logger .debug (f"Waiting pipe number: { i } " )
1401- df_received = receive_pipes [i ].recv ()
1402- if df is None :
1403- df = df_received
1404- else :
1405- df = pd .concat (objs = [df , df_received ], ignore_index = True )
1402+ dfs_received : List [pd .DataFrame ] = receive_pipes [i ].recv ()
1403+ dfs = dfs_received + dfs
14061404 logger .debug (f"Waiting proc number: { i } " )
14071405 procs [i ].join ()
14081406 logger .debug (f"Closing proc number: { i } " )
14091407 receive_pipes [i ].close ()
1408+ if len (dfs ) == 1 :
1409+ df : pd .DataFrame = dfs [0 ]
1410+ else :
1411+ df = pd .concat (objs = dfs , ignore_index = True )
14101412 return df
14111413
14121414 @staticmethod
@@ -1418,14 +1420,14 @@ def _read_parquet_paths_remote(send_pipe: mp.connection.Connection,
14181420 procs_cpu_bound : Optional [int ] = None ,
14191421 wait_objects : bool = False ,
14201422 wait_objects_timeout : Optional [float ] = 10.0 ):
1421- df : pd .DataFrame = Pandas ._read_parquet_paths (session_primitives = session_primitives ,
1422- path = path ,
1423- columns = columns ,
1424- filters = filters ,
1425- procs_cpu_bound = procs_cpu_bound ,
1426- wait_objects = wait_objects ,
1427- wait_objects_timeout = wait_objects_timeout )
1428- send_pipe .send (df )
1423+ dfs : List [ pd .DataFrame ] = Pandas ._read_parquet_paths (session_primitives = session_primitives ,
1424+ path = path ,
1425+ columns = columns ,
1426+ filters = filters ,
1427+ procs_cpu_bound = procs_cpu_bound ,
1428+ wait_objects = wait_objects ,
1429+ wait_objects_timeout = wait_objects_timeout )
1430+ send_pipe .send (dfs )
14291431 send_pipe .close ()
14301432
14311433 @staticmethod
@@ -1435,7 +1437,7 @@ def _read_parquet_paths(session_primitives: "SessionPrimitives",
14351437 filters : Optional [Union [List [Tuple [Any ]], List [List [Tuple [Any ]]]]] = None ,
14361438 procs_cpu_bound : Optional [int ] = None ,
14371439 wait_objects : bool = False ,
1438- wait_objects_timeout : Optional [float ] = 10.0 ) -> pd .DataFrame :
1440+ wait_objects_timeout : Optional [float ] = 10.0 ) -> List [ pd .DataFrame ] :
14391441 """
14401442 Read parquet data from S3
14411443
@@ -1459,24 +1461,19 @@ def _read_parquet_paths(session_primitives: "SessionPrimitives",
14591461 procs_cpu_bound = procs_cpu_bound ,
14601462 wait_objects = wait_objects ,
14611463 wait_objects_timeout = wait_objects_timeout )
1464+ return [df ]
14621465 else :
1463- df = Pandas ._read_parquet_path (session_primitives = session_primitives ,
1464- path = path [0 ],
1465- columns = columns ,
1466- filters = filters ,
1467- procs_cpu_bound = procs_cpu_bound ,
1468- wait_objects = wait_objects ,
1469- wait_objects_timeout = wait_objects_timeout )
1470- for p in path [1 :]:
1471- df_aux = Pandas ._read_parquet_path (session_primitives = session_primitives ,
1472- path = p ,
1473- columns = columns ,
1474- filters = filters ,
1475- procs_cpu_bound = procs_cpu_bound ,
1476- wait_objects = wait_objects ,
1477- wait_objects_timeout = wait_objects_timeout )
1478- df = pd .concat (objs = [df , df_aux ], ignore_index = True )
1479- return df
1466+ dfs : List [pd .DataFrame ] = []
1467+ for p in path :
1468+ df = Pandas ._read_parquet_path (session_primitives = session_primitives ,
1469+ path = p ,
1470+ columns = columns ,
1471+ filters = filters ,
1472+ procs_cpu_bound = procs_cpu_bound ,
1473+ wait_objects = wait_objects ,
1474+ wait_objects_timeout = wait_objects_timeout )
1475+ dfs .append (df )
1476+ return dfs
14801477
14811478 @staticmethod
14821479 def _read_parquet_path (session_primitives : "SessionPrimitives" ,
@@ -1851,17 +1848,17 @@ def read_csv_list(
18511848 procs .append (proc )
18521849 receive_pipes .append (receive_pipe )
18531850 utils .wait_process_release (processes = procs , target_number = procs_cpu_bound )
1851+ dfs : List [pd .DataFrame ] = []
18541852 for i in range (len (procs )):
18551853 logger .debug (f"Waiting pipe number: { i } " )
18561854 df_received = receive_pipes [i ].recv ()
1857- if df is None :
1858- df = df_received
1859- else :
1860- df = pd .concat (objs = [df , df_received ], ignore_index = True )
1855+ dfs .append (df_received )
18611856 logger .debug (f"Waiting proc number: { i } " )
18621857 procs [i ].join ()
18631858 logger .debug (f"Closing proc number: { i } " )
18641859 receive_pipes [i ].close ()
1860+ logger .debug (f"Concatenating all { len (paths )} DataFrames..." )
1861+ df = pd .concat (objs = dfs , ignore_index = True )
18651862 return df
18661863
18671864 def _read_csv_list_iterator (
0 commit comments