Skip to content

Commit c53aad7

Browse files
SNOW-1947479 Add bulk_upload_chunks parameter to write_pandas (#2322)
1 parent 07230cf commit c53aad7

File tree

3 files changed

+67
-6
lines changed

3 files changed

+67
-6
lines changed

DESCRIPTION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
1010
- v3.16(TBD)
1111
- Bumped numpy dependency from <2.1.0 to <=2.2.4
1212
- Added Windows support for Python 3.13.
13+
- Add `bulk_upload_chunks` parameter to `write_pandas` function. Setting this parameter to True changes the behaviour of write_pandas function to first write all the data chunks to the local disk and then perform the wildcard upload of the chunks folder to the stage. In default behaviour the chunks are being saved, uploaded and deleted one by one.
14+
1315

1416
- v3.15.1(May 20, 2025)
1517
- Added basic arrow support for Interval types.

src/snowflake/connector/pandas_tools.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ def write_pandas(
260260
table_type: Literal["", "temp", "temporary", "transient"] = "",
261261
use_logical_type: bool | None = None,
262262
iceberg_config: dict[str, str] | None = None,
263+
bulk_upload_chunks: bool = False,
263264
**kwargs: Any,
264265
) -> tuple[
265266
bool,
@@ -331,6 +332,8 @@ def write_pandas(
331332
* base_location: the base directory that snowflake can write iceberg metadata and files to
332333
* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
333334
* storage_serialization_policy: specifies the storage serialization policy for the table
335+
bulk_upload_chunks: If set to True, the upload will use the wildcard upload method.
336+
This is a faster method of uploading but instead of uploading and cleaning up each chunk separately it will upload all chunks at once and then clean up locally stored chunks.
334337
335338
336339
@@ -437,17 +440,27 @@ def write_pandas(
437440
chunk_path = os.path.join(tmp_folder, f"file{i}.txt")
438441
# Dump chunk into parquet file
439442
chunk.to_parquet(chunk_path, compression=compression, **kwargs)
440-
# Upload parquet file
441-
path = chunk_path.replace("\\", "\\\\").replace("'", "\\'")
443+
if not bulk_upload_chunks:
444+
# Upload parquet file chunk right away
445+
path = chunk_path.replace("\\", "\\\\").replace("'", "\\'")
446+
cursor._upload(
447+
local_file_name=f"'file://{path}'",
448+
stage_location="@" + stage_location,
449+
options={"parallel": parallel, "source_compression": "auto_detect"},
450+
)
451+
452+
# Remove chunk file
453+
os.remove(chunk_path)
454+
455+
if bulk_upload_chunks:
456+
# Upload tmp directory with parquet chunks
457+
path = tmp_folder.replace("\\", "\\\\").replace("'", "\\'")
442458
cursor._upload(
443-
local_file_name=f"'file://{path}'",
459+
local_file_name=f"'file://{path}/*'",
444460
stage_location="@" + stage_location,
445461
options={"parallel": parallel, "source_compression": "auto_detect"},
446462
)
447463

448-
# Remove chunk file
449-
os.remove(chunk_path)
450-
451464
# in Snowflake, all parquet data is stored in a single column, $1, so we must select columns explicitly
452465
# see (https://docs.snowflake.com/en/user-guide/script-data-load-transform-parquet.html)
453466
if quote_identifiers:

test/integ/pandas/test_pandas_tools.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,3 +1139,49 @@ def test_pandas_with_single_quote(
11391139
)
11401140
finally:
11411141
cnx.execute_string(f"drop table if exists {table_name}")
1142+
1143+
1144+
@pytest.mark.parametrize("bulk_upload_chunks", [True, False])
1145+
def test_write_pandas_bulk_chunks_upload(conn_cnx, bulk_upload_chunks):
1146+
"""Tests whether overwriting table using a Pandas DataFrame works as expected."""
1147+
random_table_name = random_string(5, "userspoints_")
1148+
df_data = [("Dash", 50), ("Luke", 20), ("Mark", 10), ("John", 30)]
1149+
df = pandas.DataFrame(df_data, columns=["name", "points"])
1150+
1151+
table_name = random_table_name
1152+
col_id = "id"
1153+
col_name = "name"
1154+
col_points = "points"
1155+
1156+
create_sql = (
1157+
f"CREATE OR REPLACE TABLE {table_name}"
1158+
f"({col_name} STRING, {col_points} INT, {col_id} INT AUTOINCREMENT)"
1159+
)
1160+
1161+
select_count_sql = f"SELECT count(*) FROM {table_name}"
1162+
drop_sql = f"DROP TABLE IF EXISTS {table_name}"
1163+
with conn_cnx() as cnx: # type: SnowflakeConnection
1164+
cnx.execute_string(create_sql)
1165+
try:
1166+
# Write dataframe with 1 row
1167+
success, nchunks, nrows, _ = write_pandas(
1168+
cnx,
1169+
df,
1170+
random_table_name,
1171+
quote_identifiers=False,
1172+
auto_create_table=False,
1173+
overwrite=True,
1174+
index=True,
1175+
on_error="continue",
1176+
chunk_size=1,
1177+
bulk_upload_chunks=bulk_upload_chunks,
1178+
)
1179+
# Check write_pandas output
1180+
assert success
1181+
assert nchunks == 4
1182+
assert nrows == 4
1183+
result = cnx.cursor(DictCursor).execute(select_count_sql).fetchone()
1184+
# Check number of rows
1185+
assert result["COUNT(*)"] == 4
1186+
finally:
1187+
cnx.execute_string(drop_sql)

0 commit comments

Comments
 (0)