@@ -1211,7 +1211,7 @@ def drop_duplicated_columns(dataframe: pd.DataFrame, inplace: bool = True) -> pd
12111211 def read_parquet (self ,
12121212 path : Union [str , List [str ]],
12131213 columns : Optional [List [str ]] = None ,
1214- filters : Optional [Union [List [Tuple [Any ]], List [Tuple [Any ]]]] = None ,
1214+ filters : Optional [Union [List [Tuple [Any ]], List [List [ Tuple [Any ] ]]]] = None ,
12151215 procs_cpu_bound : Optional [int ] = None ) -> pd .DataFrame :
12161216 """
12171217 Read parquet data from S3
@@ -1274,7 +1274,7 @@ def _read_parquet_paths_remote(send_pipe: mp.connection.Connection,
12741274 session_primitives : Any ,
12751275 path : Union [str , List [str ]],
12761276 columns : Optional [List [str ]] = None ,
1277- filters : Optional [Union [List [Tuple [Any ]], List [Tuple [Any ]]]] = None ,
1277+ filters : Optional [Union [List [Tuple [Any ]], List [List [ Tuple [Any ] ]]]] = None ,
12781278 procs_cpu_bound : Optional [int ] = None ):
12791279 df : pd .DataFrame = Pandas ._read_parquet_paths (session_primitives = session_primitives ,
12801280 path = path ,
@@ -1288,7 +1288,7 @@ def _read_parquet_paths_remote(send_pipe: mp.connection.Connection,
12881288 def _read_parquet_paths (session_primitives : Any ,
12891289 path : Union [str , List [str ]],
12901290 columns : Optional [List [str ]] = None ,
1291- filters : Optional [Union [List [Tuple [Any ]], List [Tuple [Any ]]]] = None ,
1291+ filters : Optional [Union [List [Tuple [Any ]], List [List [ Tuple [Any ] ]]]] = None ,
12921292 procs_cpu_bound : Optional [int ] = None ) -> pd .DataFrame :
12931293 """
12941294 Read parquet data from S3
@@ -1327,7 +1327,7 @@ def _read_parquet_paths(session_primitives: Any,
13271327 def _read_parquet_path (session_primitives : Any ,
13281328 path : str ,
13291329 columns : Optional [List [str ]] = None ,
1330- filters : Optional [Union [List [Tuple [Any ]], List [Tuple [Any ]]]] = None ,
1330+ filters : Optional [Union [List [Tuple [Any ]], List [List [ Tuple [Any ] ]]]] = None ,
13311331 procs_cpu_bound : Optional [int ] = None ) -> pd .DataFrame :
13321332 """
13331333 Read parquet data from S3
@@ -1369,7 +1369,7 @@ def read_table(self,
13691369 database : str ,
13701370 table : str ,
13711371 columns : Optional [List [str ]] = None ,
1372- filters : Optional [Union [List [Tuple [Any ]], List [Tuple [Any ]]]] = None ,
1372+ filters : Optional [Union [List [Tuple [Any ]], List [List [ Tuple [Any ] ]]]] = None ,
13731373 procs_cpu_bound : Optional [int ] = None ) -> pd .DataFrame :
13741374 """
13751375 Read PARQUET table from S3 using the Glue Catalog location skipping Athena's necessity
@@ -1408,6 +1408,7 @@ def read_sql_redshift(self,
14081408 temp_s3_path = temp_s3_path [:- 1 ] if temp_s3_path [- 1 ] == "/" else temp_s3_path
14091409 temp_s3_path = f"{ temp_s3_path } /{ name } "
14101410 logger .debug (f"temp_s3_path: { temp_s3_path } " )
1411+ self ._session .s3 .delete_objects (path = temp_s3_path )
14111412 paths : Optional [List [str ]] = None
14121413 try :
14131414 paths = self ._session .redshift .to_parquet (sql = sql ,
@@ -1416,11 +1417,11 @@ def read_sql_redshift(self,
14161417 connection = connection )
14171418 logger .debug (f"paths: { paths } " )
14181419 df : pd .DataFrame = self .read_parquet (path = paths , procs_cpu_bound = procs_cpu_bound ) # type: ignore
1419- self ._session .s3 .delete_listed_objects (objects_paths = paths )
1420+ self ._session .s3 .delete_listed_objects (objects_paths = paths + [ temp_s3_path + "/manifest" ]) # type: ignore
14201421 return df
14211422 except Exception as e :
14221423 if paths is not None :
1423- self ._session .s3 .delete_listed_objects (objects_paths = paths )
1424+ self ._session .s3 .delete_listed_objects (objects_paths = paths + [ temp_s3_path + "/manifest" ] )
14241425 else :
14251426 self ._session .s3 .delete_objects (path = temp_s3_path )
14261427 raise e
0 commit comments