Skip to content

Commit 1b50043

Browse files
committed
Add cast_columns to Pandas.to_parquet.
1 parent a4df88b commit 1b50043

File tree

4 files changed

+211
-118
lines changed

4 files changed

+211
-118
lines changed

awswrangler/glue.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -58,23 +58,21 @@ def _type_athena2python(dtype):
5858
else:
5959
raise UnsupportedType(f"Unsupported Athena type: {dtype}")
6060

61-
def metadata_to_glue(
62-
self,
63-
dataframe,
64-
path,
65-
objects_paths,
66-
file_format,
67-
database=None,
68-
table=None,
69-
partition_cols=None,
70-
preserve_index=True,
71-
mode="append",
72-
):
73-
schema = Glue._build_schema(
74-
dataframe=dataframe,
75-
partition_cols=partition_cols,
76-
preserve_index=preserve_index,
77-
)
61+
def metadata_to_glue(self,
62+
dataframe,
63+
path,
64+
objects_paths,
65+
file_format,
66+
database=None,
67+
table=None,
68+
partition_cols=None,
69+
preserve_index=True,
70+
mode="append",
71+
cast_columns=None):
72+
schema = Glue._build_schema(dataframe=dataframe,
73+
partition_cols=partition_cols,
74+
preserve_index=preserve_index,
75+
cast_columns=cast_columns)
7876
table = table if table else Glue._parse_table_name(path)
7977
table = table.lower().replace(".", "_")
8078
if mode == "overwrite":
@@ -155,7 +153,10 @@ def get_connection_details(self, name):
155153
Name=name, HidePassword=False)["Connection"]
156154

157155
@staticmethod
158-
def _build_schema(dataframe, partition_cols, preserve_index):
156+
def _build_schema(dataframe,
157+
partition_cols,
158+
preserve_index,
159+
cast_columns=None):
159160
if not partition_cols:
160161
partition_cols = []
161162
schema_built = []
@@ -169,10 +170,14 @@ def _build_schema(dataframe, partition_cols, preserve_index):
169170
schema_built.append((name, athena_type))
170171
for col in dataframe.columns:
171172
name = str(col)
172-
dtype = str(dataframe[name].dtype)
173+
if cast_columns and name in cast_columns:
174+
dtype = cast_columns[name]
175+
else:
176+
dtype = str(dataframe[name].dtype)
173177
if name not in partition_cols:
174178
athena_type = Glue._type_pandas2athena(dtype)
175179
schema_built.append((name, athena_type))
180+
logger.debug(f"schema_built:\n{schema_built}")
176181
return schema_built
177182

178183
@staticmethod

0 commit comments

Comments
 (0)