Skip to content

Commit c4b995f

Browse files
sfc-gh-zyaosfc-gh-fpawlowski
authored andcommitted
SNOW-2057867 refactor and fixes to make pandas write work for Python … (#2304)
(cherry picked from commit fe9547b)
1 parent 8109502 commit c4b995f

File tree

2 files changed

+34
-31
lines changed

2 files changed

+34
-31
lines changed

src/snowflake/connector/pandas_tools.py

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from snowflake.connector import ProgrammingError
2121
from snowflake.connector.options import pandas
2222
from snowflake.connector.telemetry import TelemetryData, TelemetryField
23-
from snowflake.connector.util_text import random_string
2423

2524
from ._utils import (
2625
_PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS_STRING,
@@ -108,11 +107,7 @@ def _create_temp_stage(
108107
overwrite: bool,
109108
use_scoped_temp_object: bool = False,
110109
) -> str:
111-
stage_name = (
112-
random_name_for_temp_object(TempObjectType.STAGE)
113-
if use_scoped_temp_object
114-
else random_string()
115-
)
110+
stage_name = random_name_for_temp_object(TempObjectType.STAGE)
116111
stage_location = build_location_helper(
117112
database=database,
118113
schema=schema,
@@ -179,11 +174,7 @@ def _create_temp_file_format(
179174
sql_use_logical_type: str,
180175
use_scoped_temp_object: bool = False,
181176
) -> str:
182-
file_format_name = (
183-
random_name_for_temp_object(TempObjectType.FILE_FORMAT)
184-
if use_scoped_temp_object
185-
else random_string()
186-
)
177+
file_format_name = random_name_for_temp_object(TempObjectType.FILE_FORMAT)
187178
file_format_location = build_location_helper(
188179
database=database,
189180
schema=schema,
@@ -388,6 +379,10 @@ def write_pandas(
388379
"Unsupported table type. Expected table types: temp/temporary, transient"
389380
)
390381

382+
if table_type.lower() in ["temp", "temporary"]:
383+
# Add scoped keyword when applicable.
384+
table_type = get_temp_type_for_object(_use_scoped_temp_object).lower()
385+
391386
if chunk_size is None:
392387
chunk_size = len(df)
393388

@@ -443,22 +438,13 @@ def write_pandas(
443438
# Dump chunk into parquet file
444439
chunk.to_parquet(chunk_path, compression=compression, **kwargs)
445440
# Upload parquet file
446-
upload_sql = (
447-
"PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ "
448-
"'file://{path}' ? PARALLEL={parallel}"
449-
).format(
450-
path=chunk_path.replace("\\", "\\\\").replace("'", "\\'"),
451-
parallel=parallel,
452-
)
453-
params = ("@" + stage_location,)
454-
logger.debug(f"uploading files with '{upload_sql}', params: %s", params)
455-
cursor.execute(
456-
upload_sql,
457-
_is_internal=True,
458-
_force_qmark_paramstyle=True,
459-
params=params,
460-
num_statements=1,
441+
path = chunk_path.replace("\\", "\\\\").replace("'", "\\'")
442+
cursor._upload(
443+
local_file_name=f"'file://{path}'",
444+
stage_location="@" + stage_location,
445+
options={"parallel": parallel, "source_compression": "auto_detect"},
461446
)
447+
462448
# Remove chunk file
463449
os.remove(chunk_path)
464450

@@ -522,7 +508,11 @@ def drop_object(name: str, object_type: str) -> None:
522508
target_table_location = build_location_helper(
523509
database,
524510
schema,
525-
random_string() if (overwrite and auto_create_table) else table_name,
511+
(
512+
random_name_for_temp_object(TempObjectType.TABLE)
513+
if (overwrite and auto_create_table)
514+
else table_name
515+
),
526516
quote_identifiers,
527517
)
528518

test/integ/pandas/test_pandas_tools.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from datetime import datetime, timedelta, timezone
77
from typing import TYPE_CHECKING, Any, Callable, Generator
88
from unittest import mock
9+
from unittest.mock import MagicMock
910

1011
import numpy.random
1112
import pytest
@@ -543,7 +544,10 @@ def mocked_execute(*args, **kwargs):
543544
with mock.patch(
544545
"snowflake.connector.cursor.SnowflakeCursor.execute",
545546
side_effect=mocked_execute,
546-
) as m_execute:
547+
) as m_execute, mock.patch(
548+
"snowflake.connector.cursor.SnowflakeCursor._upload",
549+
side_effect=MagicMock(),
550+
) as _:
547551
success, nchunks, nrows, _ = write_pandas(
548552
cnx,
549553
sf_connector_version_df.get(),
@@ -593,7 +597,10 @@ def mocked_execute(*args, **kwargs):
593597
with mock.patch(
594598
"snowflake.connector.cursor.SnowflakeCursor.execute",
595599
side_effect=mocked_execute,
596-
) as m_execute:
600+
) as m_execute, mock.patch(
601+
"snowflake.connector.cursor.SnowflakeCursor._upload",
602+
side_effect=MagicMock(),
603+
) as _:
597604
success, nchunks, nrows, _ = write_pandas(
598605
cnx,
599606
sf_connector_version_df.get(),
@@ -645,7 +652,10 @@ def mocked_execute(*args, **kwargs):
645652
with mock.patch(
646653
"snowflake.connector.cursor.SnowflakeCursor.execute",
647654
side_effect=mocked_execute,
648-
) as m_execute:
655+
) as m_execute, mock.patch(
656+
"snowflake.connector.cursor.SnowflakeCursor._upload",
657+
side_effect=MagicMock(),
658+
) as _:
649659
cnx._update_parameters({"PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS": True})
650660
success, nchunks, nrows, _ = write_pandas(
651661
cnx,
@@ -703,7 +713,10 @@ def mocked_execute(*args, **kwargs):
703713
with mock.patch(
704714
"snowflake.connector.cursor.SnowflakeCursor.execute",
705715
side_effect=mocked_execute,
706-
) as m_execute:
716+
) as m_execute, mock.patch(
717+
"snowflake.connector.cursor.SnowflakeCursor._upload",
718+
side_effect=MagicMock(),
719+
) as _:
707720
success, nchunks, nrows, _ = write_pandas(
708721
cnx,
709722
sf_connector_version_df.get(),

0 commit comments

Comments
 (0)