Skip to content

Commit 06d4eff

Browse files
authored
SNOW-1652349: Add support for iceberg to write_pandas (#2056)
1 parent d5871a2 commit 06d4eff

File tree

5 files changed

+254
-45
lines changed

5 files changed

+254
-45
lines changed

DESCRIPTION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
1313
- Added a <19.0.0 pin to pyarrow as a workaround to a bug affecting Azure Batch.
1414
- Optimized distribution package lookup to speed up import.
1515
- Fixed a bug where privatelink OCSP Cache url could not be determined if privatelink account name was specified in uppercase
16+
- Added support for iceberg tables to `write_pandas`
1617

1718
- v3.13.2(January 29, 2025)
1819
- Changed not to use scoped temporary objects.

src/snowflake/connector/pandas_tools.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,42 @@ def _create_temp_file_format(
215215
return file_format_location
216216

217217

218+
def _convert_value_to_sql_option(value: Union[str, bool, int, float]) -> str:
219+
if isinstance(value, str):
220+
if len(value) > 1 and value.startswith("'") and value.endswith("'"):
221+
return value
222+
else:
223+
value = value.replace(
224+
"'", "''"
225+
) # escape single quotes before adding a pair of quotes
226+
return f"'{value}'"
227+
else:
228+
return str(value)
229+
230+
231+
def _iceberg_config_statement_helper(iceberg_config: dict[str, str]) -> str:
232+
ALLOWED_CONFIGS = {
233+
"EXTERNAL_VOLUME",
234+
"CATALOG",
235+
"BASE_LOCATION",
236+
"CATALOG_SYNC",
237+
"STORAGE_SERIALIZATION_POLICY",
238+
}
239+
240+
normalized = {
241+
k.upper(): _convert_value_to_sql_option(v)
242+
for k, v in iceberg_config.items()
243+
if v is not None
244+
}
245+
246+
if invalid_configs := set(normalized.keys()) - ALLOWED_CONFIGS:
247+
raise ProgrammingError(
248+
f"Invalid iceberg configurations option(s) provided {', '.join(sorted(invalid_configs))}"
249+
)
250+
251+
return " ".join(f"{k}={v}" for k, v in normalized.items())
252+
253+
218254
def write_pandas(
219255
conn: SnowflakeConnection,
220256
df: pandas.DataFrame,
@@ -231,6 +267,7 @@ def write_pandas(
231267
overwrite: bool = False,
232268
table_type: Literal["", "temp", "temporary", "transient"] = "",
233269
use_logical_type: bool | None = None,
270+
iceberg_config: dict[str, str] | None = None,
234271
**kwargs: Any,
235272
) -> tuple[
236273
bool,
@@ -295,6 +332,14 @@ def write_pandas(
295332
Snowflake can interpret Parquet logical types during data loading. To enable Parquet logical types,
296333
set use_logical_type as True. Set to None to use Snowflakes default. For more information, see:
297334
https://docs.snowflake.com/en/sql-reference/sql/create-file-format
335+
iceberg_config: A dictionary that can contain the following iceberg configuration values:
336+
* external_volume: specifies the identifier for the external volume where
337+
the Iceberg table stores its metadata files and data in Parquet format
338+
* catalog: specifies either Snowflake or a catalog integration to use for this table
339+
* base_location: the base directory that snowflake can write iceberg metadata and files to
340+
* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
341+
* storage_serialization_policy: specifies the storage serialization policy for the table
342+
298343
299344
300345
Returns:
@@ -479,9 +524,14 @@ def drop_object(name: str, object_type: str) -> None:
479524
quote_identifiers,
480525
)
481526

527+
iceberg = "ICEBERG " if iceberg_config else ""
528+
iceberg_config_statement = _iceberg_config_statement_helper(
529+
iceberg_config or {}
530+
)
531+
482532
create_table_sql = (
483-
f"CREATE {table_type.upper()} TABLE IF NOT EXISTS identifier(?) "
484-
f"({create_table_columns})"
533+
f"CREATE {table_type.upper()} {iceberg}TABLE IF NOT EXISTS identifier(?) "
534+
f"({create_table_columns}) {iceberg_config_statement}"
485535
f" /* Python:snowflake.connector.pandas_tools.write_pandas() */ "
486536
)
487537
params = (target_table_location,)

test/integ/conftest.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,11 @@ def conn_cnx() -> Callable[..., ContextManager[SnowflakeConnection]]:
246246
return db
247247

248248

249+
@pytest.fixture(scope="module")
250+
def module_conn_cnx() -> Callable[..., ContextManager[SnowflakeConnection]]:
251+
return db
252+
253+
249254
@pytest.fixture()
250255
def negative_conn_cnx() -> Callable[..., ContextManager[SnowflakeConnection]]:
251256
"""Use this if an incident is expected and we don't want GS to create a dump file about the incident."""

test/integ/pandas/test_pandas_tools.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from __future__ import annotations
77

88
import math
9+
import re
910
from datetime import datetime, timedelta, timezone
1011
from typing import TYPE_CHECKING, Any, Callable, Generator
1112
from unittest import mock
@@ -26,10 +27,14 @@
2627

2728
try:
2829
from snowflake.connector.options import pandas
29-
from snowflake.connector.pandas_tools import write_pandas
30+
from snowflake.connector.pandas_tools import (
31+
_iceberg_config_statement_helper,
32+
write_pandas,
33+
)
3034
except ImportError:
3135
pandas = None
3236
write_pandas = None
37+
_iceberg_config_statement_helper = None
3338

3439
if TYPE_CHECKING:
3540
from snowflake.connector import SnowflakeConnection
@@ -1029,6 +1034,34 @@ def mock_execute(*args, **kwargs):
10291034
cnx.execute_string(f"drop schema if exists {target_schema}")
10301035

10311036

1037+
def test__iceberg_config_statement_helper():
1038+
config = {
1039+
"EXTERNAL_VOLUME": "vol",
1040+
"CATALOG": "'SNOWFLAKE'",
1041+
"BASE_LOCATION": "/root",
1042+
"CATALOG_SYNC": "foo",
1043+
"STORAGE_SERIALIZATION_POLICY": "bar",
1044+
}
1045+
assert (
1046+
_iceberg_config_statement_helper(config)
1047+
== "EXTERNAL_VOLUME='vol' CATALOG='SNOWFLAKE' BASE_LOCATION='/root' CATALOG_SYNC='foo' STORAGE_SERIALIZATION_POLICY='bar'"
1048+
)
1049+
1050+
config["STORAGE_SERIALIZATION_POLICY"] = None
1051+
assert (
1052+
_iceberg_config_statement_helper(config)
1053+
== "EXTERNAL_VOLUME='vol' CATALOG='SNOWFLAKE' BASE_LOCATION='/root' CATALOG_SYNC='foo'"
1054+
)
1055+
1056+
config["foo"] = True
1057+
config["bar"] = True
1058+
with pytest.raises(
1059+
ProgrammingError,
1060+
match=re.escape("Invalid iceberg configurations option(s) provided BAR, FOO"),
1061+
):
1062+
_iceberg_config_statement_helper(config)
1063+
1064+
10321065
def test_write_pandas_with_on_error(
10331066
conn_cnx: Callable[..., Generator[SnowflakeConnection, None, None]],
10341067
):

0 commit comments

Comments
 (0)