Skip to content

Commit c95a5d0

Browse files
authored
fix: respect order of columns in to_iceberg (#2768)
1 parent de48a86 commit c95a5d0

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

awswrangler/athena/_write_iceberg.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ def to_iceberg(
499499
"""
500500
else:
501501
sql_statement = f"""
502-
INSERT INTO "{database}"."{table}"
502+
INSERT INTO "{database}"."{table}" ({', '.join([f'"{x}"' for x in df.columns])})
503503
SELECT {', '.join([f'"{x}"' for x in df.columns])}
504504
FROM "{database}"."{temp_table}"
505505
"""

tests/unit/test_athena_iceberg.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,58 @@ def test_athena_to_iceberg_merge_into(path: str, path2: str, glue_database: str,
650650
assert_pandas_equals(df_expected, df_out)
651651

652652

653+
def test_athena_to_iceberg_cols_order(path: str, path2: str, glue_database: str, glue_table: str) -> None:
654+
kwargs = {
655+
"database": glue_database,
656+
"table": glue_table,
657+
"table_location": path,
658+
"temp_path": path2,
659+
"partition_cols": ["partition"],
660+
"schema_evolution": True,
661+
"keep_files": False,
662+
}
663+
664+
df = pd.DataFrame(
665+
{
666+
"partition": [1, 1, 2, 2],
667+
"column1": ["X", "Y", "Z", "Z"],
668+
"column2": ["A", "B", "C", "D"],
669+
}
670+
)
671+
wr.athena.to_iceberg(df=df, mode="overwrite_partitions", **kwargs)
672+
673+
# Adding a column
674+
df_new_col_last = pd.DataFrame(
675+
{
676+
"partition": [2, 2],
677+
"column1": ["Z", "Z"],
678+
"column2": ["C", "D"],
679+
"new_column": [True, False],
680+
}
681+
)
682+
wr.athena.to_iceberg(df=df_new_col_last, mode="overwrite_partitions", **kwargs)
683+
684+
# Switching the order of columns
685+
df_new_col_not_last = pd.DataFrame(
686+
{
687+
"partition": [2, 2],
688+
"column1": ["Z", "Z"],
689+
"new_column": [True, False],
690+
"column2": ["C", "D"],
691+
}
692+
)
693+
wr.athena.to_iceberg(df=df_new_col_not_last, mode="overwrite_partitions", **kwargs)
694+
695+
df_out = wr.athena.read_sql_query(
696+
sql=f'SELECT * FROM "{glue_table}"',
697+
database=glue_database,
698+
ctas_approach=False,
699+
unload_approach=False,
700+
)
701+
assert len(df) == len(df_out)
702+
assert len(df.columns) + 1 == len(df_out.columns)
703+
704+
653705
def test_athena_to_iceberg_empty_df_error(
654706
path: str,
655707
path2: str,

0 commit comments

Comments
 (0)