-
-
Notifications
You must be signed in to change notification settings - Fork 25
Open
Description
Detail Bug Report
Summary
- Context: The
write_pandasfunction infakesnow/pandas_tools.pyis a mock implementation of Snowflake'spandas_tools.write_pandas()that inserts pandas DataFrame data into a database table. - Bug: The
overwriteparameter is accepted but completely ignored, causing data to be appended instead of replaced. - Actual vs. expected: When
overwrite=True, data is appended to the existing table; the expected behavior is to truncate the table first (whenauto_create_table=False) or drop and recreate it (whenauto_create_table=True). - Impact: Users expecting data to be replaced will instead have duplicate or incorrect data, leading to data integrity issues.
Code with bug
def write_pandas(
conn: FakeSnowflakeConnection,
df: pd.DataFrame,
table_name: str,
database: str | None = None,
schema: str | None = None,
chunk_size: int | None = None,
compression: str = "gzip",
on_error: str = "abort_statement",
parallel: int = 4,
quote_identifiers: bool = True,
auto_create_table: bool = False,
create_temp_table: bool = False,
overwrite: bool = False, # <-- BUG π΄ Parameter accepted but never used
table_type: Literal["", "temp", "temporary", "transient"] = "",
**kwargs: Any,
) -> WritePandasResult:
name = table_name
if schema:
name = f"{schema}.{name}"
if database:
name = f"{database}.{name}"
if auto_create_table:
cols = [f"{c} {sql_type(t)}" for c, t in df.dtypes.to_dict().items()]
conn.cursor().execute(f"CREATE TABLE IF NOT EXISTS {name} ({','.join(cols)})")
count = _insert_df(conn._duck_conn, df, name) # <-- BUG π΄ Always inserts without checking overwrite flag
# mocks https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#output
mock_copy_results = [("fakesnow/file0.txt", "LOADED", count, count, 1, 0, None, None, None, None)]
# return success
return (True, len(mock_copy_results), count, mock_copy_results)Failing test
import pandas as pd
import snowflake.connector
import snowflake.connector.pandas_tools
from fakesnow import patch
def test_overwrite_true():
"""When overwrite=True, existing data should be replaced."""
with patch():
conn = snowflake.connector.connect(database="DB", schema="PUBLIC")
with conn.cursor() as cur:
# Create table with initial data
cur.execute("CREATE TABLE customers (ID int, FIRST_NAME varchar)")
cur.execute("INSERT INTO customers VALUES (1, 'Alice')")
# Write new data with overwrite=True
df = pd.DataFrame.from_records([{"ID": 2, "FIRST_NAME": "Bob"}])
snowflake.connector.pandas_tools.write_pandas(
conn, df, "CUSTOMERS", overwrite=True
)
# Should only have the new row (Alice should be gone)
cur.execute("SELECT * FROM customers ORDER BY ID")
result = cur.fetchall()
print(f"overwrite=True result: {result}")
assert result == [(2, "Bob")], f"Expected only Bob, got {result}"Observed output:
overwrite=True result: [(1, 'Alice'), (2, 'Bob')]
AssertionError: Expected only Bob, got [(1, 'Alice'), (2, 'Bob')]
Recommended fix
def write_pandas(
conn: FakeSnowflakeConnection,
df: pd.DataFrame,
table_name: str,
database: str | None = None,
schema: str | None = None,
chunk_size: int | None = None,
compression: str = "gzip",
on_error: str = "abort_statement",
parallel: int = 4,
quote_identifiers: bool = True,
auto_create_table: bool = False,
create_temp_table: bool = False,
overwrite: bool = False,
table_type: Literal["", "temp", "temporary", "transient"] = "",
**kwargs: Any,
) -> WritePandasResult:
name = table_name
if schema:
name = f"{schema}.{name}"
if database:
name = f"{database}.{name}"
if auto_create_table:
cols = [f"{c} {sql_type(t)}" for c, t in df.dtypes.to_dict().items()]
if overwrite:
# Drop existing table if overwriting with auto_create
conn.cursor().execute(f"DROP TABLE IF EXISTS {name}") # <-- FIX π’
conn.cursor().execute(f"CREATE TABLE {name} ({','.join(cols)})") # <-- FIX π’
else:
conn.cursor().execute(f"CREATE TABLE IF NOT EXISTS {name} ({','.join(cols)})")
elif overwrite:
# Truncate existing table if overwriting without auto_create
conn.cursor().execute(f"TRUNCATE TABLE {name}") # <-- FIX π’
count = _insert_df(conn._duck_conn, df, name)
# mocks https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#output
mock_copy_results = [("fakesnow/file0.txt", "LOADED", count, count, 1, 0, None, None, None, None)]
# return success
return (True, len(mock_copy_results), count, mock_copy_results)Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels