11from typing import List , Tuple , Dict
22import logging
3+ import os
34
45import pandas as pd # type: ignore
56
6- from pyspark import sql
7+ from pyspark .sql .functions import pandas_udf , PandasUDFType , spark_partition_id
8+ from pyspark .sql .types import TimestampType
9+ from pyspark .sql import DataFrame
710
811from awswrangler .exceptions import MissingBatchDetected , UnsupportedFileFormat
912
@@ -35,7 +38,7 @@ def _extract_casts(dtypes):
3538 def date2timestamp (dataframe ):
3639 for name , dtype in dataframe .dtypes :
3740 if dtype == "date" :
38- dataframe = dataframe .withColumn (name , dataframe [name ].cast (sql . types . TimestampType ()))
41+ dataframe = dataframe .withColumn (name , dataframe [name ].cast (TimestampType ()))
3942 logger .warning (f"Casting column { name } from date to timestamp!" )
4043 return dataframe
4144
@@ -93,9 +96,13 @@ def to_redshift(
9396 spark .conf .set ("spark.sql.execution.arrow.enabled" , "true" )
9497 session_primitives = self ._session .primitives
9598
96- @sql .functions .pandas_udf (returnType = "objects_paths string" ,
97- functionType = sql .functions .PandasUDFType .GROUPED_MAP )
99+ @pandas_udf (returnType = "objects_paths string" , functionType = PandasUDFType .GROUPED_MAP )
98100 def write (pandas_dataframe ):
101+ # Exporting ARROW_PRE_0_15_IPC_FORMAT environment variable for
102+ # a temporary workaround while waiting for Apache Arrow updates
103+ # https://stackoverflow.com/questions/58273063/pandasudf-and-pyarrow-0-15-0
104+ os .environ ["ARROW_PRE_0_15_IPC_FORMAT" ] = "1"
105+
99106 del pandas_dataframe ["aws_data_wrangler_internal_partition_id" ]
100107 paths = session_primitives .session .pandas .to_parquet (dataframe = pandas_dataframe ,
101108 path = path ,
@@ -106,7 +113,7 @@ def write(pandas_dataframe):
106113 return pd .DataFrame .from_dict ({"objects_paths" : paths })
107114
108115 df_objects_paths = dataframe .repartition (numPartitions = num_partitions ) \
109- .withColumn ("aws_data_wrangler_internal_partition_id" , sql . functions . spark_partition_id ()) \
116+ .withColumn ("aws_data_wrangler_internal_partition_id" , spark_partition_id ()) \
110117 .groupby ("aws_data_wrangler_internal_partition_id" ) \
111118 .apply (write )
112119
@@ -255,7 +262,7 @@ def _flatten_struct_column(path: str, dtype: str) -> List[Tuple[str, str]]:
255262 return cols
256263
257264 @staticmethod
258- def _flatten_struct_dataframe (df : sql . DataFrame , explode_outer : bool = True ,
265+ def _flatten_struct_dataframe (df : DataFrame , explode_outer : bool = True ,
259266 explode_pos : bool = True ) -> List [Tuple [str , str , str ]]:
260267 explode : str = "EXPLODE_OUTER" if explode_outer is True else "EXPLODE"
261268 explode = f"POS{ explode } " if explode_pos is True else explode
@@ -294,8 +301,8 @@ def _build_name(name: str, expr: str) -> str:
294301 return f"{ name } _{ suffix } " .replace ("." , "_" )
295302
296303 @staticmethod
297- def flatten (dataframe : sql . DataFrame , explode_outer : bool = True , explode_pos : bool = True ,
298- name : str = "root" ) -> Dict [str , sql . DataFrame ]:
304+ def flatten (dataframe : DataFrame , explode_outer : bool = True , explode_pos : bool = True ,
305+ name : str = "root" ) -> Dict [str , DataFrame ]:
299306 """
300307 Convert a complex nested DataFrame in one (or many) flat DataFrames
301308 If a columns is a struct it is flatten directly.
@@ -311,7 +318,7 @@ def flatten(dataframe: sql.DataFrame, explode_outer: bool = True, explode_pos: b
311318 explode_pos = explode_pos )
312319 exprs_arr : List [str ] = [x [2 ] for x in cols_exprs if Spark ._is_array_or_map (x [1 ])]
313320 exprs : List [str ] = [x [2 ] for x in cols_exprs if not Spark ._is_array_or_map (x [1 ])]
314- dfs : Dict [str , sql . DataFrame ] = {name : dataframe .selectExpr (exprs )}
321+ dfs : Dict [str , DataFrame ] = {name : dataframe .selectExpr (exprs )}
315322 exprs = [x [2 ] for x in cols_exprs if not Spark ._is_array_or_map (x [1 ]) and not x [0 ].endswith ("_pos" )]
316323 for expr in exprs_arr :
317324 df_arr = dataframe .selectExpr (exprs + [expr ])
0 commit comments