From 7fbd43bb6043f70b6eb477caaa1e255beffbda54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Szczerbi=C5=84ski?= Date: Tue, 22 Apr 2025 10:00:05 +0200 Subject: [PATCH 01/12] SNOW-2026002: Change invalid TLD to be RFC compliant (#2288) (cherry picked from commit ce85800acaefe1da408f41da6debca59a5b7ef13) --- src/snowflake/connector/connection_diagnostic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/connector/connection_diagnostic.py b/src/snowflake/connector/connection_diagnostic.py index 61edb99333..53ad72f14e 100644 --- a/src/snowflake/connector/connection_diagnostic.py +++ b/src/snowflake/connector/connection_diagnostic.py @@ -579,7 +579,7 @@ def __check_for_proxies(self) -> None: cert_reqs=cert_reqs, ) resp = http.request( - "GET", "https://nonexistentdomain.invalidtld", timeout=10.0 + "GET", "https://nonexistentdomain.invalid", timeout=10.0 ) # squid does not throw exception. Check HTML From 2438ac9dcd87f4355a1e595f548820d8bc79b40a Mon Sep 17 00:00:00 2001 From: Maxim Mishchenko Date: Thu, 24 Apr 2025 11:14:37 +0200 Subject: [PATCH 02/12] SNOW-2055494 fix proper boto min versions (#2295) (cherry picked from commit f80d83e9a7c69a5b6b35859e12fc25b63606a4e0) --- setup.cfg | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.cfg b/setup.cfg index 68d731c138..280890d066 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,8 +44,8 @@ python_requires = >=3.9 packages = find_namespace: install_requires = asn1crypto>0.24.0,<2.0.0 - boto3>=1.0 - botocore>=1.0 + boto3>=1.24 + botocore>=1.24 cffi>=1.9,<2.0.0 cryptography>=3.1.0 pyOpenSSL>=22.0.0,<25.0.0 From a30edd2889c14f3a657b371b1cf9eaa01fd7c0c9 Mon Sep 17 00:00:00 2001 From: Maxim Mishchenko Date: Mon, 28 Apr 2025 20:05:20 +0200 Subject: [PATCH 03/12] SNOW-2057797 Minor python connector version bump (#2302) (cherry picked from commit 98381aeba772fed3395081c385dd0f5de0341959) --- src/snowflake/connector/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/connector/version.py b/src/snowflake/connector/version.py index 7b64c6ae0b..ab15494243 100644 --- a/src/snowflake/connector/version.py +++ b/src/snowflake/connector/version.py @@ -1,3 +1,3 @@ # Update this for the versions # Don't change the forth version number from None -VERSION = (3, 14, 1, None) +VERSION = (3, 15, 0, None) From 6e4debfb73da280d71ab1538b6055bf72db5fb6d Mon Sep 17 00:00:00 2001 From: Maxim Mishchenko Date: Mon, 28 Apr 2025 23:22:20 +0200 Subject: [PATCH 04/12] SNOW-2057797 Update requirements files (#2305) Co-authored-by: github-actions (cherry picked from commit 15efed3ac2c2e7a61adf701ba56a307deb1fc4b1) --- tested_requirements/requirements_310.reqs | 10 +++++----- tested_requirements/requirements_311.reqs | 10 +++++----- tested_requirements/requirements_312.reqs | 12 ++++++------ tested_requirements/requirements_313.reqs | 12 ++++++------ tested_requirements/requirements_39.reqs | 10 +++++----- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/tested_requirements/requirements_310.reqs b/tested_requirements/requirements_310.reqs index c40c82708c..79e1754257 100644 --- a/tested_requirements/requirements_310.reqs +++ b/tested_requirements/requirements_310.reqs @@ -1,8 +1,8 @@ # Generated on: Python 3.10.17 asn1crypto==1.5.1 -boto3==1.37.38 -botocore==1.37.38 -certifi==2025.1.31 +boto3==1.38.4 +botocore==1.38.4 +certifi==2025.4.26 cffi==1.17.1 charset-normalizer==3.4.1 cryptography==44.0.2 @@ -17,10 +17,10 @@ pyOpenSSL==25.0.0 python-dateutil==2.9.0.post0 pytz==2025.2 requests==2.32.3 -s3transfer==0.11.5 +s3transfer==0.12.0 six==1.17.0 sortedcontainers==2.4.0 tomlkit==0.13.2 typing_extensions==4.13.2 urllib3==2.4.0 -snowflake-connector-python==3.14.1 +snowflake-connector-python==3.15.0 diff --git a/tested_requirements/requirements_311.reqs b/tested_requirements/requirements_311.reqs index 62f67fd30e..2853fc83d6 100644 --- a/tested_requirements/requirements_311.reqs +++ b/tested_requirements/requirements_311.reqs @@ -1,8 +1,8 @@ # Generated on: Python 3.11.12 asn1crypto==1.5.1 -boto3==1.37.38 -botocore==1.37.38 -certifi==2025.1.31 +boto3==1.38.4 +botocore==1.38.4 +certifi==2025.4.26 cffi==1.17.1 charset-normalizer==3.4.1 cryptography==44.0.2 @@ -17,10 +17,10 @@ pyOpenSSL==25.0.0 python-dateutil==2.9.0.post0 pytz==2025.2 requests==2.32.3 -s3transfer==0.11.5 +s3transfer==0.12.0 six==1.17.0 sortedcontainers==2.4.0 tomlkit==0.13.2 typing_extensions==4.13.2 urllib3==2.4.0 -snowflake-connector-python==3.14.1 +snowflake-connector-python==3.15.0 diff --git a/tested_requirements/requirements_312.reqs b/tested_requirements/requirements_312.reqs index 232359acd6..f519fbe710 100644 --- a/tested_requirements/requirements_312.reqs +++ b/tested_requirements/requirements_312.reqs @@ -1,8 +1,8 @@ # Generated on: Python 3.12.10 asn1crypto==1.5.1 -boto3==1.37.38 -botocore==1.37.38 -certifi==2025.1.31 +boto3==1.38.4 +botocore==1.38.4 +certifi==2025.4.26 cffi==1.17.1 charset-normalizer==3.4.1 cryptography==44.0.2 @@ -17,12 +17,12 @@ pyOpenSSL==25.0.0 python-dateutil==2.9.0.post0 pytz==2025.2 requests==2.32.3 -s3transfer==0.11.5 -setuptools==79.0.0 +s3transfer==0.12.0 +setuptools==80.0.0 six==1.17.0 sortedcontainers==2.4.0 tomlkit==0.13.2 typing_extensions==4.13.2 urllib3==2.4.0 wheel==0.45.1 -snowflake-connector-python==3.14.1 +snowflake-connector-python==3.15.0 diff --git a/tested_requirements/requirements_313.reqs b/tested_requirements/requirements_313.reqs index d206c77c50..63efc21d58 100644 --- a/tested_requirements/requirements_313.reqs +++ b/tested_requirements/requirements_313.reqs @@ -1,8 +1,8 @@ # Generated on: Python 3.13.3 asn1crypto==1.5.1 -boto3==1.37.38 -botocore==1.37.38 -certifi==2025.1.31 +boto3==1.38.4 +botocore==1.38.4 +certifi==2025.4.26 cffi==1.17.1 charset-normalizer==3.4.1 cryptography==44.0.2 @@ -17,12 +17,12 @@ pyOpenSSL==25.0.0 python-dateutil==2.9.0.post0 pytz==2025.2 requests==2.32.3 -s3transfer==0.11.5 -setuptools==79.0.0 +s3transfer==0.12.0 +setuptools==80.0.0 six==1.17.0 sortedcontainers==2.4.0 tomlkit==0.13.2 typing_extensions==4.13.2 urllib3==2.4.0 wheel==0.45.1 -snowflake-connector-python==3.14.1 +snowflake-connector-python==3.15.0 diff --git a/tested_requirements/requirements_39.reqs b/tested_requirements/requirements_39.reqs index 25e17ca852..9182e849ed 100644 --- a/tested_requirements/requirements_39.reqs +++ b/tested_requirements/requirements_39.reqs @@ -1,8 +1,8 @@ # Generated on: Python 3.9.22 asn1crypto==1.5.1 -boto3==1.37.38 -botocore==1.37.38 -certifi==2025.1.31 +boto3==1.38.4 +botocore==1.38.4 +certifi==2025.4.26 cffi==1.17.1 charset-normalizer==3.4.1 cryptography==44.0.2 @@ -17,10 +17,10 @@ pyOpenSSL==25.0.0 python-dateutil==2.9.0.post0 pytz==2025.2 requests==2.32.3 -s3transfer==0.11.5 +s3transfer==0.12.0 six==1.17.0 sortedcontainers==2.4.0 tomlkit==0.13.2 typing_extensions==4.13.2 urllib3==1.26.20 -snowflake-connector-python==3.14.1 +snowflake-connector-python==3.15.0 From 31608d84629b324f0cce3108ba8dfd98ba2d8c30 Mon Sep 17 00:00:00 2001 From: Naresh Kumar <113932371+sfc-gh-nkumar@users.noreply.github.com> Date: Mon, 5 May 2025 21:11:19 +0200 Subject: [PATCH 05/12] SNOW-2052629: Add basic arrow support for Interval data types (#2296) (cherry picked from commit bbc2f80b82c57edd548883ac159ce5d899c8ac44) --- setup.py | 1 + src/snowflake/connector/arrow_context.py | 40 ++++++++++- .../ArrowIterator/CArrowChunkIterator.cpp | 31 ++++++++ .../ArrowIterator/IntervalConverter.cpp | 71 +++++++++++++++++++ .../ArrowIterator/IntervalConverter.hpp | 56 +++++++++++++++ .../ArrowIterator/SnowflakeType.cpp | 2 + .../ArrowIterator/SnowflakeType.hpp | 2 + test/integ/test_arrow_result.py | 59 +++++++++++++++ test/unit/test_converter.py | 36 ++++++++++ 9 files changed, 297 insertions(+), 1 deletion(-) create mode 100644 src/snowflake/connector/nanoarrow_cpp/ArrowIterator/IntervalConverter.cpp create mode 100644 src/snowflake/connector/nanoarrow_cpp/ArrowIterator/IntervalConverter.hpp diff --git a/setup.py b/setup.py index 5a9e364e27..37e9a96fe2 100644 --- a/setup.py +++ b/setup.py @@ -103,6 +103,7 @@ def build_extension(self, ext): "FixedSizeListConverter.cpp", "FloatConverter.cpp", "IntConverter.cpp", + "IntervalConverter.cpp", "MapConverter.cpp", "ObjectConverter.cpp", "SnowflakeType.cpp", diff --git a/src/snowflake/connector/arrow_context.py b/src/snowflake/connector/arrow_context.py index 10dc9ea558..a14e75d7e2 100644 --- a/src/snowflake/connector/arrow_context.py +++ b/src/snowflake/connector/arrow_context.py @@ -15,7 +15,7 @@ from .converter import _generate_tzinfo_from_tzoffset if TYPE_CHECKING: - from numpy import datetime64, float64, int64 + from numpy import datetime64, float64, int64, timedelta64 try: @@ -163,3 +163,41 @@ def DECFLOAT_to_decimal(self, exponent: int, significand: bytes) -> decimal.Deci def DECFLOAT_to_numpy_float64(self, exponent: int, significand: bytes) -> float64: return numpy.float64(self.DECFLOAT_to_decimal(exponent, significand)) + + def INTERVAL_YEAR_MONTH_to_numpy_timedelta(self, months: int) -> timedelta64: + return numpy.timedelta64(months, "M") + + def INTERVAL_DAY_TIME_int_to_numpy_timedelta(self, nanos: int) -> timedelta64: + return numpy.timedelta64(nanos, "ns") + + def INTERVAL_DAY_TIME_int_to_timedelta(self, nanos: int) -> timedelta: + # Python timedelta only supports microsecond precision. We receive value in + # nanoseconds. + return timedelta(microseconds=nanos // 1000) + + def INTERVAL_DAY_TIME_decimal_to_numpy_timedelta(self, value: bytes) -> timedelta64: + # Snowflake supports up to 9 digits leading field precision for the day-time + # interval. That when represented in nanoseconds can not be stored in a 64-bit + # integer. So we send these as Decimal128 from server to client. + # Arrow uses little-endian by default. + # https://arrow.apache.org/docs/format/Columnar.html#byte-order-endianness + nanos = int.from_bytes(value, byteorder="little", signed=True) + # Numpy timedelta only supports up to 64-bit integers, so we need to change the + # unit to milliseconds to avoid overflow. + # Max value received from server + # = 10**9 * NANOS_PER_DAY - 1 + # = 86399999999999999999999 nanoseconds + # = 86399999999999999 milliseconds + # math.log2(86399999999999999) = 56.3 < 64 + return numpy.timedelta64(nanos // 1_000_000, "ms") + + def INTERVAL_DAY_TIME_decimal_to_timedelta(self, value: bytes) -> timedelta: + # Snowflake supports up to 9 digits leading field precision for the day-time + # interval. That when represented in nanoseconds can not be stored in a 64-bit + # integer. So we send these as Decimal128 from server to client. + # Arrow uses little-endian by default. + # https://arrow.apache.org/docs/format/Columnar.html#byte-order-endianness + nanos = int.from_bytes(value, byteorder="little", signed=True) + # Python timedelta only supports microsecond precision. We receive value in + # nanoseconds. + return timedelta(microseconds=nanos // 1000) diff --git a/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/CArrowChunkIterator.cpp b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/CArrowChunkIterator.cpp index 95ac959c8a..aea7d42d05 100644 --- a/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/CArrowChunkIterator.cpp +++ b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/CArrowChunkIterator.cpp @@ -13,6 +13,7 @@ #include "FixedSizeListConverter.hpp" #include "FloatConverter.hpp" #include "IntConverter.hpp" +#include "IntervalConverter.hpp" #include "MapConverter.hpp" #include "ObjectConverter.hpp" #include "StringConverter.hpp" @@ -479,6 +480,36 @@ std::shared_ptr getConverterFromSchema( break; } + case SnowflakeType::Type::INTERVAL_YEAR_MONTH: { + converter = std::make_shared( + array, context, useNumpy); + break; + } + + case SnowflakeType::Type::INTERVAL_DAY_TIME: { + switch (schemaView.type) { + case NANOARROW_TYPE_INT64: + converter = std::make_shared( + array, context, useNumpy); + break; + case NANOARROW_TYPE_DECIMAL128: + converter = std::make_shared( + array, context, useNumpy); + break; + default: { + std::string errorInfo = Logger::formatString( + "[Snowflake Exception] unknown arrow internal data type(%d) " + "for OBJECT data in %s", + NANOARROW_TYPE_ENUM_STRING[schemaView.type], + schemaView.schema->name); + logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str()); + PyErr_SetString(PyExc_Exception, errorInfo.c_str()); + break; + } + } + break; + } + default: { std::string errorInfo = Logger::formatString( "[Snowflake Exception] unknown snowflake data type : %d", st); diff --git a/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/IntervalConverter.cpp b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/IntervalConverter.cpp new file mode 100644 index 0000000000..cc0afdbd9a --- /dev/null +++ b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/IntervalConverter.cpp @@ -0,0 +1,71 @@ +#include "IntervalConverter.hpp" + +#include +#include + +#include "Python/Common.hpp" +#include "Python/Helpers.hpp" + +namespace sf { + +static constexpr char INTERVAL_DT_DECIMAL_TO_NUMPY_TIMEDELTA[] = + "INTERVAL_DAY_TIME_decimal_to_numpy_timedelta"; +static constexpr char INTERVAL_DT_DECIMAL_TO_TIMEDELTA[] = + "INTERVAL_DAY_TIME_decimal_to_timedelta"; +static constexpr char INTERVAL_DT_INT_TO_NUMPY_TIMEDELTA[] = + "INTERVAL_DAY_TIME_int_to_numpy_timedelta"; +static constexpr char INTERVAL_DT_INT_TO_TIMEDELTA[] = + "INTERVAL_DAY_TIME_int_to_timedelta"; + +IntervalYearMonthConverter::IntervalYearMonthConverter(ArrowArrayView* array, + PyObject* context, + bool useNumpy) + : m_array(array), m_context(context), m_useNumpy(useNumpy) {} + +PyObject* IntervalYearMonthConverter::toPyObject(int64_t rowIndex) const { + if (ArrowArrayViewIsNull(m_array, rowIndex)) { + Py_RETURN_NONE; + } + int64_t val = ArrowArrayViewGetIntUnsafe(m_array, rowIndex); + if (m_useNumpy) { + return PyObject_CallMethod( + m_context, "INTERVAL_YEAR_MONTH_to_numpy_timedelta", "L", val); + } + // Python timedelta does not support year-month intervals. Use long instead. + return PyLong_FromLongLong(val); +} + +IntervalDayTimeConverterInt::IntervalDayTimeConverterInt(ArrowArrayView* array, + PyObject* context, + bool useNumpy) + : m_array(array), m_context(context) { + m_method = useNumpy ? INTERVAL_DT_INT_TO_NUMPY_TIMEDELTA + : INTERVAL_DT_INT_TO_TIMEDELTA; +} + +PyObject* IntervalDayTimeConverterInt::toPyObject(int64_t rowIndex) const { + if (ArrowArrayViewIsNull(m_array, rowIndex)) { + Py_RETURN_NONE; + } + int64_t val = ArrowArrayViewGetIntUnsafe(m_array, rowIndex); + return PyObject_CallMethod(m_context, m_method, "L", val); +} + +IntervalDayTimeConverterDecimal::IntervalDayTimeConverterDecimal( + ArrowArrayView* array, PyObject* context, bool useNumpy) + : m_array(array), m_context(context) { + m_method = useNumpy ? INTERVAL_DT_DECIMAL_TO_NUMPY_TIMEDELTA + : INTERVAL_DT_DECIMAL_TO_TIMEDELTA; +} + +PyObject* IntervalDayTimeConverterDecimal::toPyObject(int64_t rowIndex) const { + if (ArrowArrayViewIsNull(m_array, rowIndex)) { + Py_RETURN_NONE; + } + int64_t bytes_start = 16 * (m_array->array->offset + rowIndex); + const char* ptr_start = m_array->buffer_views[1].data.as_char; + PyObject* int128_bytes = + PyBytes_FromStringAndSize(&(ptr_start[bytes_start]), 16); + return PyObject_CallMethod(m_context, m_method, "S", int128_bytes); +} +} // namespace sf diff --git a/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/IntervalConverter.hpp b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/IntervalConverter.hpp new file mode 100644 index 0000000000..cdffddb974 --- /dev/null +++ b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/IntervalConverter.hpp @@ -0,0 +1,56 @@ +#ifndef PC_INTERVALCONVERTER_HPP +#define PC_INTERVALCONVERTER_HPP + +#include + +#include "IColumnConverter.hpp" +#include "nanoarrow.h" +#include "nanoarrow.hpp" + +namespace sf { + +class IntervalYearMonthConverter : public IColumnConverter { + public: + explicit IntervalYearMonthConverter(ArrowArrayView* array, PyObject* context, + bool useNumpy); + virtual ~IntervalYearMonthConverter() = default; + + PyObject* toPyObject(int64_t rowIndex) const override; + + private: + ArrowArrayView* m_array; + PyObject* m_context; + bool m_useNumpy; +}; + +class IntervalDayTimeConverterInt : public IColumnConverter { + public: + explicit IntervalDayTimeConverterInt(ArrowArrayView* array, PyObject* context, + bool useNumpy); + virtual ~IntervalDayTimeConverterInt() = default; + + PyObject* toPyObject(int64_t rowIndex) const override; + + private: + ArrowArrayView* m_array; + PyObject* m_context; + const char* m_method; +}; + +class IntervalDayTimeConverterDecimal : public IColumnConverter { + public: + explicit IntervalDayTimeConverterDecimal(ArrowArrayView* array, + PyObject* context, bool useNumpy); + virtual ~IntervalDayTimeConverterDecimal() = default; + + PyObject* toPyObject(int64_t rowIndex) const override; + + private: + ArrowArrayView* m_array; + PyObject* m_context; + const char* m_method; +}; + +} // namespace sf + +#endif // PC_INTERVALCONVERTER_HPP diff --git a/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/SnowflakeType.cpp b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/SnowflakeType.cpp index 6361f97597..a1c2625d7d 100644 --- a/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/SnowflakeType.cpp +++ b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/SnowflakeType.cpp @@ -15,6 +15,8 @@ std::unordered_map {"FIXED", SnowflakeType::Type::FIXED}, {"DECFLOAT", SnowflakeType::Type::DECFLOAT}, {"FLOAT", SnowflakeType::Type::REAL}, + {"INTERVAL_YEAR_MONTH", SnowflakeType::Type::INTERVAL_YEAR_MONTH}, + {"INTERVAL_DAY_TIME", SnowflakeType::Type::INTERVAL_DAY_TIME}, {"MAP", SnowflakeType::Type::MAP}, {"OBJECT", SnowflakeType::Type::OBJECT}, {"REAL", SnowflakeType::Type::REAL}, diff --git a/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/SnowflakeType.hpp b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/SnowflakeType.hpp index b01a152a95..128453585c 100644 --- a/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/SnowflakeType.hpp +++ b/src/snowflake/connector/nanoarrow_cpp/ArrowIterator/SnowflakeType.hpp @@ -30,6 +30,8 @@ class SnowflakeType { VECTOR = 16, MAP = 17, DECFLOAT = 18, + INTERVAL_YEAR_MONTH = 19, + INTERVAL_DAY_TIME = 20, }; static SnowflakeType::Type snowflakeTypeFromString(std::string str) { diff --git a/test/integ/test_arrow_result.py b/test/integ/test_arrow_result.py index 02f11ccbc4..189183bcf7 100644 --- a/test/integ/test_arrow_result.py +++ b/test/integ/test_arrow_result.py @@ -1223,6 +1223,65 @@ def test_fetch_as_numpy_val(conn_cnx): assert val[3] == numpy.datetime64("2019-01-02 12:34:56.12345678") +@pytest.mark.parametrize("use_numpy", [True, False]) +def test_select_year_month_interval_arrow(conn_cnx, use_numpy): + cases = ["0-0", "1-2", "-1-3", "999999999-11", "-999999999-11"] + expected = [0, 14, -15, 11_999_999_999, -11_999_999_999] + if use_numpy: + expected = [numpy.timedelta64(e, "M") for e in expected] + + table = "test_arrow_day_time_interval" + values = "(" + "),(".join([f"'{c}'" for c in cases]) + ")" + with conn_cnx(numpy=use_numpy) as conn: + cursor = conn.cursor() + cursor.execute("alter session set python_connector_query_result_format='arrow'") + + cursor.execute("alter session set feature_interval_types=enabled") + cursor.execute(f"create or replace table {table} (c1 interval year to month)") + cursor.execute(f"insert into {table} values {values}") + result = conn.cursor().execute(f"select * from {table}").fetchall() + result = [r[0] for r in result] + assert result == expected + + +@pytest.mark.skip( + reason="SNOW-1878635: Add support for day-time interval in ArrowStreamWriter" +) +@pytest.mark.parametrize("use_numpy", [True, False]) +def test_select_day_time_interval_arrow(conn_cnx, use_numpy): + cases = [ + "0 0:0:0.0", + "12 3:4:5.678", + "-1 2:3:4.567", + "99999 23:59:59.999999", + "-99999 23:59:59.999999", + ] + expected = [ + timedelta(days=0), + timedelta(days=12, hours=3, minutes=4, seconds=5.678), + -timedelta(days=1, hours=2, minutes=3, seconds=4.567), + timedelta(days=99999, hours=23, minutes=59, seconds=59.999999), + -timedelta(days=99999, hours=23, minutes=59, seconds=59.999999), + ] + if use_numpy: + expected = [numpy.timedelta64(e) for e in expected] + + table = "test_arrow_day_time_interval" + values = "(" + "),(".join([f"'{c}'" for c in cases]) + ")" + with conn_cnx(numpy=use_numpy) as conn: + cursor = conn.cursor() + cursor.execute("alter session set python_connector_query_result_format='arrow'") + + cursor.execute("alter session set feature_interval_types=enabled") + cursor.execute( + f"create or replace table {table} (c1 interval day(5) to second)" + ) + cursor.execute(f"insert into {table} values {values}") + result = conn.cursor().execute(f"select * from {table}").fetchall() + result = [r[0] for r in result] + assert result == expected + + def get_random_seed(): random.seed(datetime.now().timestamp()) return random.randint(0, 10000) diff --git a/test/unit/test_converter.py b/test/unit/test_converter.py index d1b143a6cd..37f41172fe 100644 --- a/test/unit/test_converter.py +++ b/test/unit/test_converter.py @@ -1,9 +1,11 @@ #!/usr/bin/env python from __future__ import annotations +from datetime import timedelta from decimal import Decimal from logging import getLogger +import numpy import pytest from snowflake.connector import ProgrammingError @@ -97,3 +99,37 @@ def test_converter_to_snowflake_bindings_error(): match=r"Binding data in type \(somethingsomething\) is not supported", ): converter._somethingsomething_to_snowflake_bindings("Bogus") + + +NANOS_PER_DAY = 24 * 60 * 60 * 10**9 + + +@pytest.mark.parametrize("nanos", [0, 1, 999, 1000, 999999, 10**5 * NANOS_PER_DAY - 1]) +def test_day_time_interval_int_to_timedelta(nanos): + converter = ArrowConverterContext() + assert converter.INTERVAL_DAY_TIME_int_to_timedelta(nanos) == timedelta( + microseconds=nanos // 1000 + ) + assert converter.INTERVAL_DAY_TIME_int_to_numpy_timedelta( + nanos + ) == numpy.timedelta64(nanos, "ns") + + +@pytest.mark.parametrize("nanos", [0, 1, 999, 1000, 999999, 10**9 * NANOS_PER_DAY - 1]) +def test_day_time_interval_decimal_to_timedelta(nanos): + converter = ArrowConverterContext() + nano_bytes = nanos.to_bytes(16, byteorder="little", signed=True) + assert converter.INTERVAL_DAY_TIME_decimal_to_timedelta(nano_bytes) == timedelta( + microseconds=nanos // 1000 + ) + assert converter.INTERVAL_DAY_TIME_decimal_to_numpy_timedelta( + nano_bytes + ) == numpy.timedelta64(nanos // 1_000_000, "ms") + + +@pytest.mark.parametrize("months", [0, 1, 999, 1000, 999999, 10**9 * 12 - 1]) +def test_year_month_interval_to_timedelta(months): + converter = ArrowConverterContext() + assert converter.INTERVAL_YEAR_MONTH_to_numpy_timedelta( + months + ) == numpy.timedelta64(months, "M") From 2c437ae5e0fa46965db9463a5d353fa02ed5477b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Sat, 9 Aug 2025 13:03:49 +0200 Subject: [PATCH 06/12] [Async] Apply #2296 to async code --- test/integ/aio/test_arrow_result_async.py | 65 +++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/integ/aio/test_arrow_result_async.py b/test/integ/aio/test_arrow_result_async.py index a9cbc5a418..f179cce6c2 100644 --- a/test/integ/aio/test_arrow_result_async.py +++ b/test/integ/aio/test_arrow_result_async.py @@ -1090,6 +1090,71 @@ async def test_fetch_as_numpy_val(conn_cnx): assert val[3] == numpy.datetime64("2019-01-02 12:34:56.12345678") +@pytest.mark.parametrize("use_numpy", [True, False]) +async def test_select_year_month_interval_arrow(conn_cnx, use_numpy): + cases = ["0-0", "1-2", "-1-3", "999999999-11", "-999999999-11"] + expected = [0, 14, -15, 11_999_999_999, -11_999_999_999] + if use_numpy: + expected = [numpy.timedelta64(e, "M") for e in expected] + + table = "test_arrow_day_time_interval" + values = "(" + "),(".join([f"'{c}'" for c in cases]) + ")" + async with conn_cnx(numpy=use_numpy) as conn: + cursor = conn.cursor() + await cursor.execute( + "alter session set python_connector_query_result_format='arrow'" + ) + + await cursor.execute("alter session set feature_interval_types=enabled") + await cursor.execute( + f"create or replace table {table} (c1 interval year to month)" + ) + await cursor.execute(f"insert into {table} values {values}") + result = await conn.cursor().execute(f"select * from {table}").fetchall() + result = [r[0] for r in result] + assert result == expected + + +@pytest.mark.skip( + reason="SNOW-1878635: Add support for day-time interval in ArrowStreamWriter" +) +@pytest.mark.parametrize("use_numpy", [True, False]) +async def test_select_day_time_interval_arrow(conn_cnx, use_numpy): + cases = [ + "0 0:0:0.0", + "12 3:4:5.678", + "-1 2:3:4.567", + "99999 23:59:59.999999", + "-99999 23:59:59.999999", + ] + expected = [ + timedelta(days=0), + timedelta(days=12, hours=3, minutes=4, seconds=5.678), + -timedelta(days=1, hours=2, minutes=3, seconds=4.567), + timedelta(days=99999, hours=23, minutes=59, seconds=59.999999), + -timedelta(days=99999, hours=23, minutes=59, seconds=59.999999), + ] + if use_numpy: + expected = [numpy.timedelta64(e) for e in expected] + + table = "test_arrow_day_time_interval" + values = "(" + "),(".join([f"'{c}'" for c in cases]) + ")" + async with conn_cnx(numpy=use_numpy) as conn: + cursor = conn.cursor() + await cursor.execute( + "alter session set python_connector_query_result_format='arrow'" + ) + + await cursor.execute("alter session set feature_interval_types=enabled") + await cursor.execute( + f"create or replace table {table} (c1 interval day(5) to second)" + ) + await cursor.execute(f"insert into {table} values {values}") + result = await conn.cursor().execute(f"select * from {table}").fetchall() + result = [r[0] for r in result] + assert result == expected + + async def iterate_over_test_chunk( test_name, conn_cnx, sql_text, row_count, col_count, eps=None, expected=None ): From fdf1d38fb7f7fe496cb71b80e128faaf743065a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kubik?= Date: Tue, 6 May 2025 12:35:30 +0200 Subject: [PATCH 07/12] NO-SNOW Enable structured types in fdn tables to unblock the CI (#2313) (cherry picked from commit b86680831a61781591f9b53fc0b61469ffbdb1a2) --- test/integ/test_arrow_result.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integ/test_arrow_result.py b/test/integ/test_arrow_result.py index 189183bcf7..7b3d13ee6b 100644 --- a/test/integ/test_arrow_result.py +++ b/test/integ/test_arrow_result.py @@ -219,6 +219,7 @@ def structured_type_wrapped_conn(conn_cnx, structured_type_support): "ENABLE_STRUCTURED_TYPES_NATIVE_ARROW_FORMAT": True, "FORCE_ENABLE_STRUCTURED_TYPES_NATIVE_ARROW_FORMAT": True, "IGNORE_CLIENT_VESRION_IN_STRUCTURED_TYPES_RESPONSE": True, + "ENABLE_STRUCTURED_TYPES_IN_FDN_TABLES": True, } with conn_cnx(session_parameters=parameters) as conn: From 0c305845f9226b83de5365c562b7bb52756abaad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Sat, 9 Aug 2025 13:06:43 +0200 Subject: [PATCH 08/12] [Async] Apply #2313 to async code --- test/integ/aio/test_arrow_result_async.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integ/aio/test_arrow_result_async.py b/test/integ/aio/test_arrow_result_async.py index f179cce6c2..40b138729f 100644 --- a/test/integ/aio/test_arrow_result_async.py +++ b/test/integ/aio/test_arrow_result_async.py @@ -129,6 +129,7 @@ async def structured_type_wrapped_conn(conn_cnx, structured_type_support): "ENABLE_STRUCTURED_TYPES_NATIVE_ARROW_FORMAT": True, "FORCE_ENABLE_STRUCTURED_TYPES_NATIVE_ARROW_FORMAT": True, "IGNORE_CLIENT_VESRION_IN_STRUCTURED_TYPES_RESPONSE": True, + "ENABLE_STRUCTURED_TYPES_IN_FDN_TABLES": True, } async with conn_cnx(session_parameters=parameters) as conn: From adc273ed99b56d8df95ab466d7086bbf59db234a Mon Sep 17 00:00:00 2001 From: Piotr Bulawa Date: Mon, 12 May 2025 09:02:04 +0200 Subject: [PATCH 09/12] SNOW-1959514: Pandas single quote character fix (#2307) (cherry picked from commit ecd5d9f779f47bc55f52357c41e27786982d2904) --- src/snowflake/connector/cursor.py | 5 +++- src/snowflake/connector/pandas_tools.py | 38 ++++++++++++++++--------- test/integ/pandas/test_pandas_tools.py | 25 ++++++++++++++++ 3 files changed, 53 insertions(+), 15 deletions(-) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index e6c3dfdb53..b6a96890e5 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -672,7 +672,10 @@ def _execute_helper( else: # or detect it. self._is_file_transfer = get_file_transfer_type(query) is not None - logger.debug("is_file_transfer: %s", self._is_file_transfer is not None) + logger.debug( + "is_file_transfer: %s", + self._is_file_transfer if self._is_file_transfer is not None else "None", + ) real_timeout = ( timeout if timeout and timeout > 0 else self._connection.network_timeout diff --git a/src/snowflake/connector/pandas_tools.py b/src/snowflake/connector/pandas_tools.py index 5c1626954e..a9555dd553 100644 --- a/src/snowflake/connector/pandas_tools.py +++ b/src/snowflake/connector/pandas_tools.py @@ -58,21 +58,26 @@ def build_location_helper( database: str | None, schema: str | None, name: str, quote_identifiers: bool ) -> str: """Helper to format table/stage/file format's location.""" - if quote_identifiers: - location = ( - (('"' + database + '".') if database else "") - + (('"' + schema + '".') if schema else "") - + ('"' + name + '"') - ) - else: - location = ( - (database + "." if database else "") - + (schema + "." if schema else "") - + name - ) + location = ( + (_escape_part_location(database, quote_identifiers) + "." if database else "") + + (_escape_part_location(schema, quote_identifiers) + "." if schema else "") + + _escape_part_location(name, quote_identifiers) + ) return location +def _escape_part_location(part: str, should_quote: bool) -> str: + if "'" in part: + should_quote = True + if should_quote: + if not part.startswith('"'): + part = '"' + part + if not part.endswith('"'): + part = part + '"' + + return part + + def _do_create_temp_stage( cursor: SnowflakeCursor, stage_location: str, @@ -473,6 +478,7 @@ def drop_object(name: str, object_type: str) -> None: drop_sql = f"DROP {object_type.upper()} IF EXISTS identifier(?) /* Python:snowflake.connector.pandas_tools.write_pandas() */" params = (name,) logger.debug(f"dropping {object_type} with '{drop_sql}'. params: %s", params) + cursor.execute( drop_sql, _is_internal=True, @@ -570,10 +576,11 @@ def drop_object(name: str, object_type: str) -> None: num_statements=1, ) + copy_stage_location = "@" + stage_location.replace("'", "\\'") copy_into_sql = ( f"COPY INTO identifier(?) /* Python:snowflake.connector.pandas_tools.write_pandas() */ " f"({columns}) " - f"FROM (SELECT {parquet_columns} FROM @{stage_location}) " + f"FROM (SELECT {parquet_columns} FROM '{copy_stage_location}') " f"FILE_FORMAT=(" f"TYPE=PARQUET " f"COMPRESSION={compression_map[compression]}" @@ -582,7 +589,10 @@ def drop_object(name: str, object_type: str) -> None: f") " f"PURGE=TRUE ON_ERROR=?" ) - params = (target_table_location, on_error) + params = ( + target_table_location, + on_error, + ) logger.debug(f"copying into with '{copy_into_sql}'. params: %s", params) copy_results = cursor.execute( copy_into_sql, diff --git a/test/integ/pandas/test_pandas_tools.py b/test/integ/pandas/test_pandas_tools.py index 8d69fd1a9f..43f788ebbb 100644 --- a/test/integ/pandas/test_pandas_tools.py +++ b/test/integ/pandas/test_pandas_tools.py @@ -566,6 +566,8 @@ def mocked_execute(*args, **kwargs): (None, "schema", False, "schema"), (None, None, True, ""), (None, None, False, ""), + ("data'base", "schema", True, '"data\'base"."schema"'), + ("data'base", "schema", False, '"data\'base".schema'), ], ) def test_stage_location_building( @@ -1101,3 +1103,26 @@ def test_write_pandas_with_on_error( assert result["COUNT(*)"] == 1 finally: cnx.execute_string(drop_sql) + + +def test_pandas_with_single_quote( + conn_cnx: Callable[..., Generator[SnowflakeConnection]], +): + random_table_name = random_string(5, "test'table") + table_name = f'"{random_table_name}"' + create_sql = f"CREATE OR REPLACE TABLE {table_name}(A INT)" + df_data = [[1]] + df = pandas.DataFrame(df_data, columns=["a"]) + with conn_cnx() as cnx: # type: SnowflakeConnection + try: + cnx.execute_string(create_sql) + write_pandas( + cnx, + df, + table_name, + quote_identifiers=False, + auto_create_table=False, + index=False, + ) + finally: + cnx.execute_string(f"drop table if exists {table_name}") From e845d3bda19a49668ad37009fce2788d62ce053f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Sat, 9 Aug 2025 13:11:11 +0200 Subject: [PATCH 10/12] [Async] Apply #2307 to async code --- src/snowflake/connector/aio/_cursor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/snowflake/connector/aio/_cursor.py b/src/snowflake/connector/aio/_cursor.py index 39a9f34791..ec2613dd54 100644 --- a/src/snowflake/connector/aio/_cursor.py +++ b/src/snowflake/connector/aio/_cursor.py @@ -235,7 +235,10 @@ async def _execute_helper( else: # or detect it. self._is_file_transfer = get_file_transfer_type(query) is not None - logger.debug("is_file_transfer: %s", self._is_file_transfer is not None) + logger.debug( + "is_file_transfer: %s", + self._is_file_transfer if self._is_file_transfer is not None else "None", + ) real_timeout = ( timeout if timeout and timeout > 0 else self._connection.network_timeout From 9833870b5c30a42504a5dd9cea00f9064ecc1b41 Mon Sep 17 00:00:00 2001 From: Zexin Yao <103003040+sfc-gh-zyao@users.noreply.github.com> Date: Mon, 12 May 2025 15:45:33 -0700 Subject: [PATCH 11/12] SNOW-2057867 refactor BindUploadAgent to make it work for Python sprocs (#2303) (cherry picked from commit 0d79989cc50369b7e688512829dbc0fe2cb836bd) --- src/snowflake/connector/bind_upload_agent.py | 8 +- src/snowflake/connector/cursor.py | 2 +- .../connector/direct_file_operation_utils.py | 34 ++++- .../integ/test_direct_file_operation_utils.py | 119 ++++++++++++++++++ test/unit/test_bind_upload_agent.py | 6 +- 5 files changed, 161 insertions(+), 8 deletions(-) create mode 100644 test/integ/test_direct_file_operation_utils.py diff --git a/src/snowflake/connector/bind_upload_agent.py b/src/snowflake/connector/bind_upload_agent.py index b71920d0b4..d01751cad8 100644 --- a/src/snowflake/connector/bind_upload_agent.py +++ b/src/snowflake/connector/bind_upload_agent.py @@ -1,6 +1,7 @@ #!/usr/bin/env python from __future__ import annotations +import os import uuid from io import BytesIO from logging import getLogger @@ -76,8 +77,11 @@ def upload(self) -> None: if row_idx >= len(self.rows) or size >= self._stream_buffer_size: break try: - self.cursor.execute( - f"PUT file://{row_idx}.csv {self.stage_path}", file_stream=f + f.seek(0) + self.cursor._upload_stream( + input_stream=f, + stage_location=os.path.join(self.stage_path, f"{row_idx}.csv"), + options={"source_compression": "auto_detect"}, ) except Error as err: logger.debug("Failed to upload the bindings file to stage.") diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index b6a96890e5..69a741075b 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -1463,7 +1463,7 @@ def executemany( bind_stage = None if ( bind_size - > self.connection._session_parameters[ + >= self.connection._session_parameters[ "CLIENT_STAGE_ARRAY_BINDING_THRESHOLD" ] > 0 diff --git a/src/snowflake/connector/direct_file_operation_utils.py b/src/snowflake/connector/direct_file_operation_utils.py index 2290b8f1e2..6d0182c2fc 100644 --- a/src/snowflake/connector/direct_file_operation_utils.py +++ b/src/snowflake/connector/direct_file_operation_utils.py @@ -1,7 +1,15 @@ from __future__ import annotations +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .connection import SnowflakeConnection + +import os from abc import ABC, abstractmethod +from .constants import CMD_TYPE_UPLOAD + class FileOperationParserBase(ABC): """The interface of internal utility functions for file operation parsing.""" @@ -37,8 +45,8 @@ def download_as_stream(self, ret, decompress=False): class FileOperationParser(FileOperationParserBase): - def __init__(self, connection): - pass + def __init__(self, connection: SnowflakeConnection): + self._connection = connection def parse_file_operation( self, @@ -49,7 +57,27 @@ def parse_file_operation( options, has_source_from_stream=False, ): - raise NotImplementedError("parse_file_operation is not yet supported") + """Parses a file operation by constructing SQL and getting the SQL parsing result from server.""" + options = options or {} + options_in_sql = " ".join(f"{k}={v}" for k, v in options.items()) + + if command_type == CMD_TYPE_UPLOAD: + if has_source_from_stream: + stage_location, unprefixed_local_file_name = os.path.split( + stage_location + ) + local_file_name = "file://" + unprefixed_local_file_name + sql = f"PUT {local_file_name} ? {options_in_sql}" + params = [stage_location] + else: + raise NotImplementedError(f"unsupported command type: {command_type}") + + with self._connection.cursor() as cursor: + # Send constructed SQL to server and get back parsing result. + processed_params = cursor._connection._process_params_qmarks(params, cursor) + return cursor._execute_helper( + sql, binding_params=processed_params, is_internal=True + ) class StreamDownloader(StreamDownloaderBase): diff --git a/test/integ/test_direct_file_operation_utils.py b/test/integ/test_direct_file_operation_utils.py new file mode 100644 index 0000000000..36d7335a4f --- /dev/null +++ b/test/integ/test_direct_file_operation_utils.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python +from __future__ import annotations + +import os +from tempfile import TemporaryDirectory +from typing import TYPE_CHECKING, Callable, Generator + +import pytest + +try: + from snowflake.connector.options import pandas + from snowflake.connector.pandas_tools import ( + _iceberg_config_statement_helper, + write_pandas, + ) +except ImportError: + pandas = None + write_pandas = None + _iceberg_config_statement_helper = None + +if TYPE_CHECKING: + from snowflake.connector import SnowflakeConnection, SnowflakeCursor + + +def _normalize_windows_local_path(path): + return path.replace("\\", "\\\\").replace("'", "\\'") + + +def _validate_upload_content( + expected_content, cursor, stage_name, local_dir, base_file_name, is_compressed +): + gz_suffix = ".gz" + stage_path = f"@{stage_name}/{base_file_name}" + local_path = os.path.join(local_dir, base_file_name) + + cursor.execute( + f"GET {stage_path} 'file://{_normalize_windows_local_path(local_dir)}'", + ) + if is_compressed: + stage_path += gz_suffix + local_path += gz_suffix + import gzip + + with gzip.open(local_path, "r") as f: + read_content = f.read().decode("utf-8") + assert read_content == expected_content, (read_content, expected_content) + else: + with open(local_path) as f: + read_content = f.read() + assert read_content == expected_content, (read_content, expected_content) + + +def _test_runner( + conn_cnx: Callable[..., Generator[SnowflakeConnection]], + task: Callable[[SnowflakeCursor, str, str, str], None], + is_compressed: bool, + special_stage_name: str = None, + special_base_file_name: str = None, +): + from snowflake.connector._utils import TempObjectType, random_name_for_temp_object + + with conn_cnx() as conn: + cursor = conn.cursor() + stage_name = special_stage_name or random_name_for_temp_object( + TempObjectType.STAGE + ) + cursor.execute(f"CREATE OR REPLACE SCOPED TEMP STAGE {stage_name}") + expected_content = "hello, world" + with TemporaryDirectory() as temp_dir: + base_file_name = special_base_file_name or "test.txt" + src_file_name = os.path.join(temp_dir, base_file_name) + with open(src_file_name, "w") as f: + f.write(expected_content) + # Run the file operation + task(cursor, stage_name, temp_dir, base_file_name) + # Clean up before validation. + os.remove(src_file_name) + # Validate result. + _validate_upload_content( + expected_content, + cursor, + stage_name, + temp_dir, + base_file_name, + is_compressed=is_compressed, + ) + + +@pytest.mark.skipolddriver +@pytest.mark.parametrize("is_compressed", [False, True]) +def test_upload( + conn_cnx: Callable[..., Generator[SnowflakeConnection]], + is_compressed: bool, +): + def upload_task(cursor, stage_name, temp_dir, base_file_name): + cursor._upload( + local_file_name=f"'file://{_normalize_windows_local_path(os.path.join(temp_dir, base_file_name))}'", + stage_location=f"@{stage_name}", + options={"auto_compress": is_compressed}, + ) + + _test_runner(conn_cnx, upload_task, is_compressed=is_compressed) + + +@pytest.mark.skipolddriver +@pytest.mark.parametrize("is_compressed", [False, True]) +def test_upload_stream( + conn_cnx: Callable[..., Generator[SnowflakeConnection]], + is_compressed: bool, +): + def upload_stream_task(cursor, stage_name, temp_dir, base_file_name): + with open(f"{os.path.join(temp_dir, base_file_name)}", "rb") as input_stream: + cursor._upload_stream( + input_stream=input_stream, + stage_location=f"@{os.path.join(stage_name, base_file_name)}", + options={"auto_compress": is_compressed}, + ) + + _test_runner(conn_cnx, upload_stream_task, is_compressed=is_compressed) diff --git a/test/unit/test_bind_upload_agent.py b/test/unit/test_bind_upload_agent.py index 6f9ed64740..e5f8c1ea9e 100644 --- a/test/unit/test_bind_upload_agent.py +++ b/test/unit/test_bind_upload_agent.py @@ -12,7 +12,8 @@ def test_bind_upload_agent_uploading_multiple_files(): rows = [bytes(10)] * 10 agent = BindUploadAgent(csr, rows, stream_buffer_size=10) agent.upload() - assert csr.execute.call_count == 11 # 1 for stage creation + 10 files + assert csr.execute.call_count == 1 # 1 for stage creation + assert csr._upload_stream.call_count == 10 # 10 for 10 files def test_bind_upload_agent_row_size_exceed_buffer_size(): @@ -22,7 +23,8 @@ def test_bind_upload_agent_row_size_exceed_buffer_size(): rows = [bytes(15)] * 10 agent = BindUploadAgent(csr, rows, stream_buffer_size=10) agent.upload() - assert csr.execute.call_count == 11 # 1 for stage creation + 10 files + assert csr.execute.call_count == 1 # 1 for stage creation + assert csr._upload_stream.call_count == 10 # 10 for 10 files def test_bind_upload_agent_scoped_temp_object(): From 0208d11d0410948af62ab4301e72e5da242a50fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Sat, 9 Aug 2025 14:02:49 +0200 Subject: [PATCH 12/12] [Async] Apply #2303 to async code # Conflicts: # src/snowflake/connector/aio/_direct_file_operation_utils.py --- ..._upload_agent.py => _bind_upload_agent.py} | 8 +- src/snowflake/connector/aio/_cursor.py | 4 +- .../aio/_direct_file_operation_utils.py | 34 ++++- .../test_direct_file_operation_utils_async.py | 117 ++++++++++++++++++ test/unit/aio/test_bind_upload_agent_async.py | 10 +- 5 files changed, 162 insertions(+), 11 deletions(-) rename src/snowflake/connector/aio/{_build_upload_agent.py => _bind_upload_agent.py} (88%) create mode 100644 test/integ/aio/test_direct_file_operation_utils_async.py diff --git a/src/snowflake/connector/aio/_build_upload_agent.py b/src/snowflake/connector/aio/_bind_upload_agent.py similarity index 88% rename from src/snowflake/connector/aio/_build_upload_agent.py rename to src/snowflake/connector/aio/_bind_upload_agent.py index d68d053234..d1b08fe656 100644 --- a/src/snowflake/connector/aio/_build_upload_agent.py +++ b/src/snowflake/connector/aio/_bind_upload_agent.py @@ -3,6 +3,7 @@ from __future__ import annotations +import os from io import BytesIO from logging import getLogger from typing import TYPE_CHECKING, cast @@ -56,8 +57,11 @@ async def upload(self) -> None: if row_idx >= len(self.rows) or size >= self._stream_buffer_size: break try: - await self.cursor.execute( - f"PUT file://{row_idx}.csv {self.stage_path}", file_stream=f + f.seek(0) + await self.cursor._upload_stream( + input_stream=f, + stage_location=os.path.join(self.stage_path, f"{row_idx}.csv"), + options={"source_compression": "auto_detect"}, ) except Error as err: logger.debug("Failed to upload the bindings file to stage.") diff --git a/src/snowflake/connector/aio/_cursor.py b/src/snowflake/connector/aio/_cursor.py index ec2613dd54..cfde2b0341 100644 --- a/src/snowflake/connector/aio/_cursor.py +++ b/src/snowflake/connector/aio/_cursor.py @@ -23,7 +23,7 @@ ProgrammingError, ) from snowflake.connector._sql_util import get_file_transfer_type -from snowflake.connector.aio._build_upload_agent import BindUploadAgent +from snowflake.connector.aio._bind_upload_agent import BindUploadAgent from snowflake.connector.aio._result_batch import ( ResultBatch, create_batches_from_response, @@ -803,7 +803,7 @@ async def executemany( bind_stage = None if ( bind_size - > self.connection._session_parameters[ + >= self.connection._session_parameters[ "CLIENT_STAGE_ARRAY_BINDING_THRESHOLD" ] > 0 diff --git a/src/snowflake/connector/aio/_direct_file_operation_utils.py b/src/snowflake/connector/aio/_direct_file_operation_utils.py index e63bd14d63..9b0ea636b9 100644 --- a/src/snowflake/connector/aio/_direct_file_operation_utils.py +++ b/src/snowflake/connector/aio/_direct_file_operation_utils.py @@ -1,7 +1,15 @@ from __future__ import annotations +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ._connection import SnowflakeConnection + +import os from abc import ABC, abstractmethod +from ..constants import CMD_TYPE_UPLOAD + class FileOperationParserBase(ABC): """The interface of internal utility functions for file operation parsing.""" @@ -37,8 +45,8 @@ async def download_as_stream(self, ret, decompress=False): class FileOperationParser(FileOperationParserBase): - def __init__(self, connection): - pass + def __init__(self, connection: SnowflakeConnection): + self._connection = connection async def parse_file_operation( self, @@ -49,7 +57,27 @@ async def parse_file_operation( options, has_source_from_stream=False, ): - raise NotImplementedError("parse_file_operation is not yet supported") + """Parses a file operation by constructing SQL and getting the SQL parsing result from server.""" + options = options or {} + options_in_sql = " ".join(f"{k}={v}" for k, v in options.items()) + + if command_type == CMD_TYPE_UPLOAD: + if has_source_from_stream: + stage_location, unprefixed_local_file_name = os.path.split( + stage_location + ) + local_file_name = "file://" + unprefixed_local_file_name + sql = f"PUT {local_file_name} ? {options_in_sql}" + params = [stage_location] + else: + raise NotImplementedError(f"unsupported command type: {command_type}") + + async with self._connection.cursor() as cursor: + # Send constructed SQL to server and get back parsing result. + processed_params = cursor._connection._process_params_qmarks(params, cursor) + return await cursor._execute_helper( + sql, binding_params=processed_params, is_internal=True + ) class StreamDownloader(StreamDownloaderBase): diff --git a/test/integ/aio/test_direct_file_operation_utils_async.py b/test/integ/aio/test_direct_file_operation_utils_async.py new file mode 100644 index 0000000000..350b506759 --- /dev/null +++ b/test/integ/aio/test_direct_file_operation_utils_async.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +from __future__ import annotations + +import os +from tempfile import TemporaryDirectory +from typing import TYPE_CHECKING, AsyncGenerator, Callable, Coroutine + +import pytest + +try: + from snowflake.connector.options import pandas + from snowflake.connector.pandas_tools import ( + _iceberg_config_statement_helper, + write_pandas, + ) +except ImportError: + pandas = None + write_pandas = None + _iceberg_config_statement_helper = None + +if TYPE_CHECKING: + from snowflake.connector.aio import SnowflakeConnection, SnowflakeCursor + +from ..test_direct_file_operation_utils import _normalize_windows_local_path + + +async def _validate_upload_content( + expected_content, cursor, stage_name, local_dir, base_file_name, is_compressed +): + gz_suffix = ".gz" + stage_path = f"@{stage_name}/{base_file_name}" + local_path = os.path.join(local_dir, base_file_name) + + await cursor.execute( + f"GET {stage_path} 'file://{_normalize_windows_local_path(local_dir)}'", + ) + if is_compressed: + stage_path += gz_suffix + local_path += gz_suffix + import gzip + + with gzip.open(local_path, "r") as f: + read_content = f.read().decode("utf-8") + assert read_content == expected_content, (read_content, expected_content) + else: + with open(local_path) as f: + read_content = f.read() + assert read_content == expected_content, (read_content, expected_content) + + +async def _test_runner( + conn_cnx: Callable[..., AsyncGenerator[SnowflakeConnection]], + task: Callable[[SnowflakeCursor, str, str, str], Coroutine[None, None, None]], + is_compressed: bool, + special_stage_name: str = None, + special_base_file_name: str = None, +): + from snowflake.connector._utils import TempObjectType, random_name_for_temp_object + + async with conn_cnx() as conn: + cursor = conn.cursor() + stage_name = special_stage_name or random_name_for_temp_object( + TempObjectType.STAGE + ) + await cursor.execute(f"CREATE OR REPLACE SCOPED TEMP STAGE {stage_name}") + expected_content = "hello, world" + with TemporaryDirectory() as temp_dir: + base_file_name = special_base_file_name or "test.txt" + src_file_name = os.path.join(temp_dir, base_file_name) + with open(src_file_name, "w") as f: + f.write(expected_content) + # Run the file operation + await task(cursor, stage_name, temp_dir, base_file_name) + # Clean up before validation. + os.remove(src_file_name) + # Validate result. + await _validate_upload_content( + expected_content, + cursor, + stage_name, + temp_dir, + base_file_name, + is_compressed=is_compressed, + ) + + +@pytest.mark.skipolddriver +@pytest.mark.parametrize("is_compressed", [False, True]) +async def test_upload( + conn_cnx: Callable[..., AsyncGenerator[SnowflakeConnection]], + is_compressed: bool, +): + async def upload_task(cursor, stage_name, temp_dir, base_file_name): + await cursor._upload( + local_file_name=f"'file://{_normalize_windows_local_path(os.path.join(temp_dir, base_file_name))}'", + stage_location=f"@{stage_name}", + options={"auto_compress": is_compressed}, + ) + + await _test_runner(conn_cnx, upload_task, is_compressed=is_compressed) + + +@pytest.mark.skipolddriver +@pytest.mark.parametrize("is_compressed", [False, True]) +async def test_upload_stream( + conn_cnx: Callable[..., AsyncGenerator[SnowflakeConnection]], + is_compressed: bool, +): + async def upload_stream_task(cursor, stage_name, temp_dir, base_file_name): + with open(f"{os.path.join(temp_dir, base_file_name)}", "rb") as input_stream: + await cursor._upload_stream( + input_stream=input_stream, + stage_location=f"@{os.path.join(stage_name, base_file_name)}", + options={"auto_compress": is_compressed}, + ) + + await _test_runner(conn_cnx, upload_stream_task, is_compressed=is_compressed) diff --git a/test/unit/aio/test_bind_upload_agent_async.py b/test/unit/aio/test_bind_upload_agent_async.py index ffceb50f15..846642caa9 100644 --- a/test/unit/aio/test_bind_upload_agent_async.py +++ b/test/unit/aio/test_bind_upload_agent_async.py @@ -9,20 +9,22 @@ async def test_bind_upload_agent_uploading_multiple_files(): - from snowflake.connector.aio._build_upload_agent import BindUploadAgent + from snowflake.connector.aio._bind_upload_agent import BindUploadAgent csr = AsyncMock(auto_spec=True) rows = [bytes(10)] * 10 agent = BindUploadAgent(csr, rows, stream_buffer_size=10) await agent.upload() - assert csr.execute.call_count == 11 # 1 for stage creation + 10 files + assert csr.execute.call_count == 1 # 1 for stage creation + assert csr._upload_stream.call_count == 10 # 10 for 10 files async def test_bind_upload_agent_row_size_exceed_buffer_size(): - from snowflake.connector.aio._build_upload_agent import BindUploadAgent + from snowflake.connector.aio._bind_upload_agent import BindUploadAgent csr = AsyncMock(auto_spec=True) rows = [bytes(15)] * 10 agent = BindUploadAgent(csr, rows, stream_buffer_size=10) await agent.upload() - assert csr.execute.call_count == 11 # 1 for stage creation + 10 files + assert csr.execute.call_count == 1 # 1 for stage creation + assert csr._upload_stream.call_count == 10 # 10 for 10 files