Skip to content

Cherrypicks to aio connector part9 #2460

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 56 commits into
base: cherrypicks-to-aio-connector-part8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
2722b95
Add support for workload identity federation (#2203)
sfc-gh-pmansour Mar 17, 2025
f1dd2bb
[Async] Apply #2203 to async code
sfc-gh-pczajka Jul 29, 2025
c2cda00
use aiohttp in wif_util
sfc-gh-pczajka Jul 29, 2025
e776ae4
Remove duplication in wif_util
sfc-gh-pczajka Jul 29, 2025
a66b110
remove duplication in workflow identity
sfc-gh-pczajka Jul 29, 2025
7ecd740
properly mock tests
sfc-gh-pczajka Jul 29, 2025
63f918c
use aioboto3
sfc-gh-pczajka Jul 30, 2025
78f08f6
Replace return with raise in WIF error check (#2231)
sfc-gh-pmansour Mar 24, 2025
c6394c7
Fix inheritance order; add tests
sfc-gh-pczajka Aug 6, 2025
30865dc
Fix async get_aws_region
sfc-gh-pczajka Aug 7, 2025
10b76d3
remove silent exception catching; fix async get_aws_region
sfc-gh-pczajka Aug 7, 2025
75470f9
review fix: add test cases
sfc-gh-pczajka Aug 7, 2025
a803c48
split csp_helpers into sync and async
sfc-gh-pczajka Aug 7, 2025
0d16928
NO-SNOW: Run test when targeting branches other than main (#2221)
sfc-gh-jszczerbinski Mar 19, 2025
029197c
SNOW-2007887: improve error message handling related to timeout (#2236)
sfc-gh-aling Mar 26, 2025
53fcf4a
[ASYNC] Apply #2236 to async code
sfc-gh-pczajka Aug 5, 2025
80bfec9
SNOW-1789751: Add GCP regional and virtual endpoints support (#2233)
sfc-gh-pbulawa Mar 27, 2025
2a0f804
[ASYNC] Apply #2233 to async code
sfc-gh-pczajka Aug 5, 2025
92dda02
SNOW-2027116 Allow for UUID encoding in SnowflakeRestful interface (#…
sfc-gh-lspiegelberg Apr 9, 2025
359b1c4
[ASYNC] apply #2254 to async code
sfc-gh-pczajka Aug 7, 2025
bc6f1fb
SNOW-1955965: Fix expired S3 credentials update (#2258)
sfc-gh-pbulawa Apr 10, 2025
d79fb54
[ASYNC] apply #2258 to async code
sfc-gh-pczajka Aug 7, 2025
e6c8cdd
NO-SNOW Add PAT to authenticators allowing empty username, remove han…
sfc-gh-mkubik Apr 10, 2025
bcc8735
[ASYNC] Apply #2264 to async code
sfc-gh-pczajka Aug 7, 2025
a61ee6b
NO-SNOW Fix flaky query timeout test (#2266)
sfc-gh-mkubik Apr 11, 2025
9acde0c
SNOW-2040000 change tag to bptp-stable (#2268)
sfc-gh-akolodziejczyk Apr 14, 2025
ecd2134
SNOW-2028051 introduce a new client_fetch_threads connection paramete…
sfc-gh-mmishchenko Apr 14, 2025
5f04572
Add default entra app ID for Snowflake (#2267)
sfc-gh-pmansour Apr 15, 2025
d42c572
[ASYNC] update test after #2267
sfc-gh-pczajka Aug 7, 2025
adebadc
SNOW-2011595 Masking filter introduced on library levels (#2253)
sfc-gh-fpawlowski Apr 15, 2025
3da9417
[ASYNC] remove azure filter after #2253
sfc-gh-pczajka Aug 7, 2025
3afc7b4
NO-SNOW acquiring a lock on local OCSP cache will use a timeout (#2280)
sfc-gh-mmishchenko Apr 17, 2025
8e03822
Accept both v1 and v2 Entra ID issuer formats for WIF (#2281)
sfc-gh-pmansour Apr 17, 2025
aae2f0d
[ASYNC] apply #2281 to async code
sfc-gh-pczajka Aug 7, 2025
498d928
SNOW-2048239 revert zero timeout for oscp cache lock (#2283)
sfc-gh-mmishchenko Apr 18, 2025
57ee3f3
[ASYNC] remove flaky marker from OCSP tests after #2283
sfc-gh-pczajka Aug 7, 2025
609427f
SNOW-1993520 update tested_reqs for 3.14.1 release (#2287)
sfc-gh-mmishchenko Apr 21, 2025
a7e0f8e
SNOW-1963078 Port _upload / _download / _upload_stream / _download_st…
sfc-gh-zyao Mar 21, 2025
8cb2f45
[Async] Apply #2198 to async code
sfc-gh-fpawlowski Aug 7, 2025
0a08f6b
SNOW-1989239 - prevent silent failures on nano-arrow conversion (#2227)
sfc-gh-mkubik Mar 28, 2025
c9320ea
[Async] Apply #2227 to async code
sfc-gh-fpawlowski Aug 7, 2025
0e44bfa
NO-SNOW skip out of range year test on old driver (#2243)
sfc-gh-mkubik Mar 31, 2025
5254ff3
[Async] Apply #2243 to async code
sfc-gh-fpawlowski Aug 9, 2025
cd8f701
SNOW-191538 remove copyright headers (#2238)
sfc-gh-mkubik Apr 1, 2025
cb22ea9
[Async] Apply #2238 to async code
sfc-gh-fpawlowski Aug 7, 2025
e72912c
SNOW-1789751: Pass GCS regional and virtual params (#2241)
sfc-gh-pbulawa Apr 1, 2025
95cae93
[Async] Apply 2241 to async code
sfc-gh-fpawlowski Aug 7, 2025
6ec0460
SNOW-2019505 fix inconsistent force_put_overwrite value for _upload a…
sfc-gh-zyao Apr 2, 2025
4893131
[Async] Apply #2247 to async code
sfc-gh-fpawlowski Aug 7, 2025
ff012ea
SNOW-1896089: Lower log level (#2251)
sfc-gh-pbulawa Apr 3, 2025
56da1b9
[Async] Apply #2251 to async code
sfc-gh-fpawlowski Aug 7, 2025
183c25d
SNOW-2026002: Invalid url became valid (#2252)
sfc-gh-jszczerbinski Apr 4, 2025
d3654a6
SNOW-2011670 Allow url parameter requestId to be set with statement p…
sfc-gh-lspiegelberg Apr 4, 2025
1668800
[Async] Apply #2240 to async code
sfc-gh-fpawlowski Aug 9, 2025
592fd95
NO-SNOW skip tests of custom requestId on olddriver (#2256)
sfc-gh-mkubik Apr 8, 2025
68d64a4
[Async] Apply #2256 to async code
sfc-gh-fpawlowski Aug 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
7 changes: 2 additions & 5 deletions .github/workflows/build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ on:
- v*
pull_request:
branches:
- master
- main
- prep-**
- dev/aio-connector
- '**'
workflow_dispatch:
inputs:
logLevel:
Expand All @@ -24,7 +21,7 @@ on:
description: "Test scenario tags"

concurrency:
# older builds for the same pull request numer or branch should be cancelled
# older builds for the same pull request number or branch should be cancelled
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}

Expand Down
4 changes: 3 additions & 1 deletion DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
- Added handling of PAT provided in `password` field.
- Improved error message for client-side query cancellations due to timeouts.
- Added support of GCS regional endpoints.
- Added `gcs_use_virtual_endpoints` connection property that forces the usage of the virtual GCS usage. See more: https://cloud.google.com/storage/docs/request-endpoints#xml-api
- Added `gcs_use_virtual_endpoints` connection property that forces the usage of the virtual GCS usage. Thanks to this it should be possible to set up private DNS entry for the GCS endpoint. See more: https://cloud.google.com/storage/docs/request-endpoints#xml-api
- Fixed a bug that caused driver to fail silently on `TO_DATE` arrow to python conversion when invalid date was followed by the correct one.
- Added `check_arrow_conversion_error_on_every_column` connection property that can be set to `False` to restore previous behaviour in which driver will ignore errors until it occurs in the last column. This flag's purpose is to unblock workflows that may be impacted by the bugfix and will be removed in later releases.
- Lower log levels from info to debug for some of the messages to make the output easier to follow.
- Allow the connector to inherit a UUID4 generated upstream, provided in statement parameters (field: `requestId`), rather than automatically generate a UUID4 to use for the HTTP Request ID.

- v3.14.0(March 03, 2025)
- Bumped pyOpenSSL dependency upper boundary from <25.0.0 to <26.0.0.
Expand Down
4 changes: 2 additions & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ timestamps {
stage('Test') {
try {
def commit_hash = "main" // default which we want to override
def bptp_tag = "bptp-built"
def bptp_tag = "bptp-stable"
def response = authenticatedGithubCall("https://api.github.com/repos/snowflakedb/snowflake/git/ref/tags/${bptp_tag}")
commit_hash = response.object.sha
// Append the bptp-built commit sha to params
// Append the bptp-stable commit sha to params
params += [string(name: 'svn_revision', value: commit_hash)]
} catch(Exception e) {
println("Exception computing commit hash from: ${response}")
Expand Down
4 changes: 1 addition & 3 deletions benchmark/benchmark_unit_converter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env python
#
# Copyright (c) 2012-2019 Snowflake Computing Inc. All rights reserved.
#

from __future__ import annotations

from logging import getLogger
Expand Down
3 changes: 0 additions & 3 deletions samples/auth_by_key_pair_from_file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#!/usr/bin/env python
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
"""
This sample shows how to implement a key pair authentication plugin
which reads private key from a file
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ python_requires = >=3.9
packages = find_namespace:
install_requires =
asn1crypto>0.24.0,<2.0.0
boto3>=1.0
botocore>=1.0
cffi>=1.9,<2.0.0
cryptography>=3.1.0
pyOpenSSL>=22.0.0,<25.0.0
Expand Down Expand Up @@ -98,3 +100,4 @@ secure-local-storage =
keyring>=23.1.0,<26.0.0
aio =
aiohttp
aioboto3
3 changes: 0 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#!/usr/bin/env python
#
# Copyright (c) 2012-2019 Snowflake Computing Inc. All rights reserved.
#

import os
import sys
Expand Down
7 changes: 3 additions & 4 deletions src/snowflake/connector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
#!/usr/bin/env python
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

# Python Db API v2
#
from __future__ import annotations
Expand All @@ -16,6 +12,8 @@
import logging
from logging import NullHandler

from snowflake.connector.externals_utils.externals_setup import setup_external_libraries

from .connection import SnowflakeConnection
from .cursor import DictCursor
from .dbapi import (
Expand Down Expand Up @@ -48,6 +46,7 @@
from .version import VERSION

logging.getLogger(__name__).addHandler(NullHandler())
setup_external_libraries()


@wraps(SnowflakeConnection.__init__)
Expand Down
3 changes: 0 additions & 3 deletions src/snowflake/connector/_query_context_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
from __future__ import annotations

from functools import total_ordering
Expand Down
4 changes: 0 additions & 4 deletions src/snowflake/connector/_sql_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from __future__ import annotations

import re
Expand Down
22 changes: 18 additions & 4 deletions src/snowflake/connector/_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from __future__ import annotations

import string
from enum import Enum
from random import choice
from threading import Timer
from uuid import UUID


class TempObjectType(Enum):
Expand All @@ -33,6 +30,8 @@ class TempObjectType(Enum):
"PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS"
)

REQUEST_ID_STATEMENT_PARAM_NAME = "requestId"


def generate_random_alphanumeric(length: int = 10) -> str:
return "".join(choice(ALPHANUMERIC) for _ in range(length))
Expand All @@ -46,6 +45,21 @@ def get_temp_type_for_object(use_scoped_temp_objects: bool) -> str:
return SCOPED_TEMPORARY_STRING if use_scoped_temp_objects else TEMPORARY_STRING


def is_uuid4(str_or_uuid: str | UUID) -> bool:
"""Check whether provided string str is a valid UUID version4."""
if isinstance(str_or_uuid, UUID):
return str_or_uuid.version == 4

if not isinstance(str_or_uuid, str):
return False

try:
uuid_str = str(UUID(str_or_uuid, version=4))
except ValueError:
return False
return uuid_str == str_or_uuid


class _TrackedQueryCancellationTimer(Timer):
def __init__(self, interval, function, args=None, kwargs=None):
super().__init__(interval, function, args, kwargs)
Expand Down
4 changes: 0 additions & 4 deletions src/snowflake/connector/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from __future__ import annotations

from ._connection import SnowflakeConnection
Expand Down
8 changes: 0 additions & 8 deletions src/snowflake/connector/aio/_azure_storage_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from __future__ import annotations

import base64
Expand All @@ -15,7 +11,6 @@

import aiohttp

from ..azure_storage_client import AzureCredentialFilter
from ..azure_storage_client import (
SnowflakeAzureRestClient as SnowflakeAzureRestClientSync,
)
Expand All @@ -37,8 +32,6 @@

logger = getLogger(__name__)

getLogger("aiohttp").addFilter(AzureCredentialFilter())


class SnowflakeAzureRestClient(
SnowflakeStorageClientAsync, SnowflakeAzureRestClientSync
Expand All @@ -49,7 +42,6 @@ def __init__(
credentials: StorageCredential | None,
chunk_size: int,
stage_info: dict[str, Any],
use_s3_regional_url: bool = False,
unsafe_file_write: bool = False,
) -> None:
SnowflakeAzureRestClientSync.__init__(
Expand Down
4 changes: 1 addition & 3 deletions src/snowflake/connector/aio/_build_upload_agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env python
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#


from __future__ import annotations

Expand Down
44 changes: 36 additions & 8 deletions src/snowflake/connector/aio/_connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -35,6 +32,7 @@
from ..connection import _get_private_bytes_from_file
from ..constants import (
_CONNECTIVITY_ERR_MSG,
ENV_VAR_EXPERIMENTAL_AUTHENTICATION,
ENV_VAR_PARTNER,
PARAMETER_AUTOCOMMIT,
PARAMETER_CLIENT_PREFETCH_THREADS,
Expand All @@ -55,6 +53,7 @@
ER_CONNECTION_IS_CLOSED,
ER_FAILED_TO_CONNECT_TO_DB,
ER_INVALID_VALUE,
ER_INVALID_WIF_SETTINGS,
)
from ..network import (
DEFAULT_AUTHENTICATOR,
Expand All @@ -64,14 +63,17 @@
PROGRAMMATIC_ACCESS_TOKEN,
REQUEST_ID,
USR_PWD_MFA_AUTHENTICATOR,
WORKLOAD_IDENTITY_AUTHENTICATOR,
ReauthenticationRequest,
)
from ..sqlstate import SQLSTATE_CONNECTION_NOT_EXISTS, SQLSTATE_FEATURE_NOT_SUPPORTED
from ..telemetry import TelemetryData, TelemetryField
from ..time_util import get_time_millis
from ..util_text import split_statements
from ..wif_util import AttestationProvider
from ._cursor import SnowflakeCursor
from ._description import CLIENT_NAME
from ._direct_file_operation_utils import FileOperationParser, StreamDownloader
from ._network import SnowflakeRestful
from ._telemetry import TelemetryClient
from ._time_util import HeartBeatTimer
Expand All @@ -87,6 +89,7 @@
AuthByPlugin,
AuthByUsrPwdMfa,
AuthByWebBrowser,
AuthByWorkloadIdentity,
)

logger = getLogger(__name__)
Expand Down Expand Up @@ -116,6 +119,10 @@ def __init__(
# check SNOW-1218851 for long term improvement plan to refactor ocsp code
atexit.register(self._close_at_exit)

# Set up the file operation parser and stream downloader.
self._file_operation_parser = FileOperationParser(self)
self._stream_downloader = StreamDownloader(self)

def __enter__(self):
# async connection does not support sync context manager
raise TypeError(
Expand Down Expand Up @@ -301,8 +308,6 @@ async def __open_connection(self):
backoff_generator=self._backoff_generator,
)
elif self._authenticator == PROGRAMMATIC_ACCESS_TOKEN:
if not self._token and self._password:
self._token = self._password
self.auth_class = AuthByPAT(self._token)
elif self._authenticator == USR_PWD_MFA_AUTHENTICATOR:
self._session_parameters[PARAMETER_CLIENT_REQUEST_MFA_TOKEN] = (
Expand All @@ -320,6 +325,29 @@ async def __open_connection(self):
timeout=self.login_timeout,
backoff_generator=self._backoff_generator,
)
elif self._authenticator == WORKLOAD_IDENTITY_AUTHENTICATOR:
if ENV_VAR_EXPERIMENTAL_AUTHENTICATION not in os.environ:
Error.errorhandler_wrapper(
self,
None,
ProgrammingError,
{
"msg": f"Please set the '{ENV_VAR_EXPERIMENTAL_AUTHENTICATION}' environment variable to use the '{WORKLOAD_IDENTITY_AUTHENTICATOR}' authenticator.",
"errno": ER_INVALID_WIF_SETTINGS,
},
)
# Standardize the provider enum.
if self._workload_identity_provider and isinstance(
self._workload_identity_provider, str
):
self._workload_identity_provider = AttestationProvider.from_string(
self._workload_identity_provider
)
self.auth_class = AuthByWorkloadIdentity(
provider=self._workload_identity_provider,
token=self._token,
entra_resource=self._workload_identity_entra_resource,
)
else:
# okta URL, e.g., https://<account>.okta.com/
self.auth_class = AuthByOkta(
Expand Down Expand Up @@ -767,7 +795,7 @@ async def close(self, retry: bool = True) -> None:
await self._cancel_heartbeat()

# close telemetry first, since it needs rest to send remaining data
logger.info("closed")
logger.debug("closed")

await self._telemetry.close(
send_on_close=bool(retry and self.telemetry_enabled)
Expand All @@ -776,15 +804,15 @@ async def close(self, retry: bool = True) -> None:
await self._all_async_queries_finished()
and not self._server_session_keep_alive
):
logger.info("No async queries seem to be running, deleting session")
logger.debug("No async queries seem to be running, deleting session")
try:
await self.rest.delete_session(retry=retry)
except Exception as e:
logger.debug(
"Exception encountered in deleting session. ignoring...: %s", e
)
else:
logger.info(
logger.debug(
"There are {} async queries still running, not deleting session".format(
len(self._async_sfqids)
)
Expand Down
Loading
Loading