Skip to content

Commit d13e4e2

Browse files
sfc-gh-yuwangsfc-gh-aling
authored andcommitted
SNOW-1617451: async telemetry support and client name change (#2077)
1 parent 075fde0 commit d13e4e2

File tree

10 files changed

+398
-73
lines changed

10 files changed

+398
-73
lines changed

src/snowflake/connector/aio/_connection.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import asyncio
77
import atexit
8+
import copy
89
import logging
910
import os
1011
import pathlib
@@ -29,7 +30,7 @@
2930
from .._query_context_cache import QueryContextCache
3031
from ..compat import IS_LINUX, quote, urlencode
3132
from ..config_manager import CONFIG_MANAGER, _get_default_connection_params
32-
from ..connection import DEFAULT_CONFIGURATION
33+
from ..connection import DEFAULT_CONFIGURATION as DEFAULT_CONFIGURATION_SYNC
3334
from ..connection import SnowflakeConnection as SnowflakeConnectionSync
3435
from ..connection import _get_private_bytes_from_file
3536
from ..connection_diagnostic import ConnectionDiagnostic
@@ -69,7 +70,9 @@
6970
from ..time_util import get_time_millis
7071
from ..util_text import split_statements
7172
from ._cursor import SnowflakeCursor
73+
from ._description import CLIENT_NAME
7274
from ._network import SnowflakeRestful
75+
from ._telemetry import TelemetryClient
7376
from ._time_util import HeartBeatTimer
7477
from .auth import (
7578
FIRST_PARTY_AUTHENTICATORS,
@@ -86,6 +89,10 @@
8689

8790
logger = getLogger(__name__)
8891

92+
# deep copy to avoid pollute sync config
93+
DEFAULT_CONFIGURATION = copy.deepcopy(DEFAULT_CONFIGURATION_SYNC)
94+
DEFAULT_CONFIGURATION["application"] = (CLIENT_NAME, (type(None), str))
95+
8996

9097
class SnowflakeConnection(SnowflakeConnectionSync):
9198
OCSP_ENV_LOCK = asyncio.Lock()
@@ -103,11 +110,7 @@ def __init__(
103110
kwargs, connection_name, connections_file_path
104111
)
105112
self._connected = False
106-
# TODO: async telemetry support
107-
self._telemetry = None
108113
self.expired = False
109-
# get the imported modules from sys.modules
110-
# self._log_telemetry_imported_packages() # TODO: async telemetry support
111114
# check SNOW-1218851 for long term improvement plan to refactor ocsp code
112115
atexit.register(self._close_at_exit)
113116

@@ -569,7 +572,8 @@ async def _get_query_status(
569572
return status_ret, status_resp
570573

571574
async def _log_telemetry(self, telemetry_data) -> None:
572-
raise NotImplementedError("asyncio telemetry is not supported")
575+
if self.telemetry_enabled:
576+
await self._telemetry.try_add_log_to_batch(telemetry_data)
573577

574578
async def _log_telemetry_imported_packages(self) -> None:
575579
if self._log_imported_packages_in_telemetry:
@@ -722,8 +726,9 @@ async def close(self, retry: bool = True) -> None:
722726
# close telemetry first, since it needs rest to send remaining data
723727
logger.info("closed")
724728

725-
# TODO: async telemetry support
726-
# self._telemetry.close(send_on_close=bool(retry and self.telemetry_enabled))
729+
await self._telemetry.close(
730+
send_on_close=bool(retry and self.telemetry_enabled)
731+
)
727732
if (
728733
await self._all_async_queries_finished()
729734
and not self._server_session_keep_alive
@@ -889,6 +894,8 @@ async def connect(self, **kwargs) -> None:
889894
raise Exception(str(exceptions_dict))
890895
else:
891896
await self.__open_connection()
897+
self._telemetry = TelemetryClient(self._rest)
898+
await self._log_telemetry_imported_packages()
892899

893900
def cursor(
894901
self, cursor_class: type[SnowflakeCursor] = SnowflakeCursor

src/snowflake/connector/aio/_cursor.py

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
)
5656
from snowflake.connector.errors import BindUploadError, DatabaseError
5757
from snowflake.connector.file_transfer_agent import SnowflakeProgressPercentage
58-
from snowflake.connector.telemetry import TelemetryField
58+
from snowflake.connector.telemetry import TelemetryData, TelemetryField
5959
from snowflake.connector.time_util import get_time_millis
6060

6161
if TYPE_CHECKING:
@@ -361,8 +361,9 @@ async def _init_result_and_meta(self, data: dict[Any, Any]) -> None:
361361
self._total_rowcount += updated_rows
362362

363363
async def _init_multi_statement_results(self, data: dict) -> None:
364-
# TODO: async telemetry SNOW-1572217
365-
# self._log_telemetry_job_data(TelemetryField.MULTI_STATEMENT, TelemetryData.TRUE)
364+
await self._log_telemetry_job_data(
365+
TelemetryField.MULTI_STATEMENT, TelemetryData.TRUE
366+
)
366367
self.multi_statement_savedIds = data["resultIds"].split(",")
367368
self._multi_statement_resultIds = collections.deque(
368369
self.multi_statement_savedIds
@@ -382,8 +383,24 @@ async def _init_multi_statement_results(self, data: dict) -> None:
382383
async def _log_telemetry_job_data(
383384
self, telemetry_field: TelemetryField, value: Any
384385
) -> None:
385-
# TODO: async telemetry SNOW-1572217
386-
pass
386+
ts = get_time_millis()
387+
try:
388+
await self._connection._log_telemetry(
389+
TelemetryData.from_telemetry_data_dict(
390+
from_dict={
391+
TelemetryField.KEY_TYPE.value: telemetry_field.value,
392+
TelemetryField.KEY_SFQID.value: self._sfqid,
393+
TelemetryField.KEY_VALUE.value: value,
394+
},
395+
timestamp=ts,
396+
connection=self._connection,
397+
)
398+
)
399+
except AttributeError:
400+
logger.warning(
401+
"Cursor failed to log to telemetry. Connection object may be None.",
402+
exc_info=True,
403+
)
387404

388405
async def _preprocess_pyformat_query(
389406
self,
@@ -394,16 +411,15 @@ async def _preprocess_pyformat_query(
394411
# client side binding
395412
processed_params = self._connection._process_params_pyformat(params, self)
396413
# SNOW-513061 collect telemetry for empty sequence usage before we make the breaking change announcement
397-
# TODO: async telemetry support
398-
# if params is not None and len(params) == 0:
399-
# await self._log_telemetry_job_data(
400-
# TelemetryField.EMPTY_SEQ_INTERPOLATION,
401-
# (
402-
# TelemetryData.TRUE
403-
# if self.connection._interpolate_empty_sequences
404-
# else TelemetryData.FALSE
405-
# ),
406-
# )
414+
if params is not None and len(params) == 0:
415+
await self._log_telemetry_job_data(
416+
TelemetryField.EMPTY_SEQ_INTERPOLATION,
417+
(
418+
TelemetryData.TRUE
419+
if self.connection._interpolate_empty_sequences
420+
else TelemetryData.FALSE
421+
),
422+
)
407423
if logger.getEffectiveLevel() <= logging.DEBUG:
408424
logger.debug(
409425
f"binding: [{self._format_query_for_log(command)}] "
@@ -585,14 +601,13 @@ async def execute(
585601
self._first_chunk_time = get_time_millis()
586602

587603
# if server gives a send time, log the time it took to arrive
588-
# TODO: telemetry support in asyncio
589-
# if "data" in ret and "sendResultTime" in ret["data"]:
590-
# time_consume_first_result = (
591-
# self._first_chunk_time - ret["data"]["sendResultTime"]
592-
# )
593-
# self._log_telemetry_job_data(
594-
# TelemetryField.TIME_CONSUME_FIRST_RESULT, time_consume_first_result
595-
# )
604+
if "data" in ret and "sendResultTime" in ret["data"]:
605+
time_consume_first_result = (
606+
self._first_chunk_time - ret["data"]["sendResultTime"]
607+
)
608+
await self._log_telemetry_job_data(
609+
TelemetryField.TIME_CONSUME_FIRST_RESULT, time_consume_first_result
610+
)
596611

597612
if ret["success"]:
598613
logger.debug("SUCCESS")
@@ -893,10 +908,9 @@ async def fetch_arrow_batches(self) -> AsyncIterator[Table]:
893908
await self._prefetch_hook()
894909
if self._query_result_format != "arrow":
895910
raise NotSupportedError
896-
# TODO: async telemetry SNOW-1572217
897-
# self._log_telemetry_job_data(
898-
# TelemetryField.ARROW_FETCH_BATCHES, TelemetryData.TRUE
899-
# )
911+
await self._log_telemetry_job_data(
912+
TelemetryField.ARROW_FETCH_BATCHES, TelemetryData.TRUE
913+
)
900914
return await self._result_set._fetch_arrow_batches()
901915

902916
@overload
@@ -920,8 +934,9 @@ async def fetch_arrow_all(self, force_return_table: bool = False) -> Table | Non
920934
await self._prefetch_hook()
921935
if self._query_result_format != "arrow":
922936
raise NotSupportedError
923-
# TODO: async telemetry SNOW-1572217
924-
# self._log_telemetry_job_data(TelemetryField.ARROW_FETCH_ALL, TelemetryData.TRUE)
937+
await self._log_telemetry_job_data(
938+
TelemetryField.ARROW_FETCH_ALL, TelemetryData.TRUE
939+
)
925940
return await self._result_set._fetch_arrow_all(
926941
force_return_table=force_return_table
927942
)
@@ -933,10 +948,9 @@ async def fetch_pandas_batches(self, **kwargs: Any) -> AsyncIterator[DataFrame]:
933948
await self._prefetch_hook()
934949
if self._query_result_format != "arrow":
935950
raise NotSupportedError
936-
# TODO: async telemetry
937-
# self._log_telemetry_job_data(
938-
# TelemetryField.PANDAS_FETCH_BATCHES, TelemetryData.TRUE
939-
# )
951+
await self._log_telemetry_job_data(
952+
TelemetryField.PANDAS_FETCH_BATCHES, TelemetryData.TRUE
953+
)
940954
return await self._result_set._fetch_pandas_batches(**kwargs)
941955

942956
async def fetch_pandas_all(self, **kwargs: Any) -> DataFrame:
@@ -946,9 +960,9 @@ async def fetch_pandas_all(self, **kwargs: Any) -> DataFrame:
946960
if self._query_result_format != "arrow":
947961
raise NotSupportedError
948962
# # TODO: async telemetry
949-
# self._log_telemetry_job_data(
950-
# TelemetryField.PANDAS_FETCH_ALL, TelemetryData.TRUE
951-
# )
963+
await self._log_telemetry_job_data(
964+
TelemetryField.PANDAS_FETCH_ALL, TelemetryData.TRUE
965+
)
952966
return await self._result_set._fetch_pandas_all(**kwargs)
953967

954968
async def nextset(self) -> SnowflakeCursor | None:
@@ -981,9 +995,9 @@ async def get_result_batches(self) -> list[ResultBatch] | None:
981995
if self._result_set is None:
982996
return None
983997
# TODO: async telemetry SNOW-1572217
984-
# self._log_telemetry_job_data(
985-
# TelemetryField.GET_PARTITIONS_USED, TelemetryData.TRUE
986-
# )
998+
await self._log_telemetry_job_data(
999+
TelemetryField.GET_PARTITIONS_USED, TelemetryData.TRUE
1000+
)
9871001
return self._result_set.batches
9881002

9891003
async def get_results_from_sfqid(self, sfqid: str) -> None:
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#
2+
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
3+
#
4+
5+
"""Various constants."""
6+
7+
from __future__ import annotations
8+
9+
CLIENT_NAME = "AsyncioPythonConnector" # don't change!

src/snowflake/connector/aio/_network.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,19 @@
6060
HEADER_AUTHORIZATION_KEY,
6161
HEADER_SNOWFLAKE_TOKEN,
6262
ID_TOKEN_EXPIRED_GS_CODE,
63+
IMPLEMENTATION,
6364
MASTER_TOKEN_EXPIRED_GS_CODE,
6465
MASTER_TOKEN_INVALD_GS_CODE,
6566
MASTER_TOKEN_NOTFOUND_GS_CODE,
6667
NO_TOKEN,
67-
PYTHON_CONNECTOR_USER_AGENT,
68+
PLATFORM,
69+
PYTHON_VERSION,
6870
QUERY_IN_PROGRESS_ASYNC_CODE,
6971
QUERY_IN_PROGRESS_CODE,
7072
REQUEST_ID,
7173
REQUEST_TYPE_RENEW,
7274
SESSION_EXPIRED_GS_CODE,
75+
SNOWFLAKE_CONNECTOR_VERSION,
7376
ReauthenticationRequest,
7477
RetryRequest,
7578
)
@@ -83,13 +86,16 @@
8386
SQLSTATE_CONNECTION_WAS_NOT_ESTABLISHED,
8487
)
8588
from ..time_util import TimeoutBackoffCtx
89+
from ._description import CLIENT_NAME
8690
from ._ssl_connector import SnowflakeSSLConnector
8791

8892
if TYPE_CHECKING:
8993
from snowflake.connector.aio import SnowflakeConnection
9094

9195
logger = logging.getLogger(__name__)
9296

97+
PYTHON_CONNECTOR_USER_AGENT = f"{CLIENT_NAME}/{SNOWFLAKE_CONNECTOR_VERSION} ({PLATFORM}) {IMPLEMENTATION}/{PYTHON_VERSION}"
98+
9399
try:
94100
import aiohttp
95101
except ImportError:
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
4+
#
5+
6+
from __future__ import annotations
7+
8+
import logging
9+
from asyncio import Lock
10+
from typing import TYPE_CHECKING
11+
12+
from ..secret_detector import SecretDetector
13+
from ..telemetry import TelemetryClient as TelemetryClientSync
14+
from ..telemetry import TelemetryData
15+
from ..test_util import ENABLE_TELEMETRY_LOG, rt_plain_logger
16+
17+
if TYPE_CHECKING:
18+
from ._network import SnowflakeRestful
19+
20+
logger = logging.getLogger(__name__)
21+
22+
23+
class TelemetryClient(TelemetryClientSync):
24+
"""Client to enqueue and send metrics to the telemetry endpoint in batch."""
25+
26+
def __init__(self, rest: SnowflakeRestful, flush_size=None) -> None:
27+
super().__init__(rest, flush_size)
28+
self._lock = Lock()
29+
30+
async def add_log_to_batch(self, telemetry_data: TelemetryData) -> None:
31+
if self.is_closed:
32+
raise Exception("Attempted to add log when TelemetryClient is closed")
33+
elif not self._enabled:
34+
logger.debug("TelemetryClient disabled. Ignoring log.")
35+
return
36+
37+
async with self._lock:
38+
self._log_batch.append(telemetry_data)
39+
40+
if len(self._log_batch) >= self._flush_size:
41+
await self.send_batch()
42+
43+
async def send_batch(self) -> None:
44+
if self.is_closed:
45+
raise Exception("Attempted to send batch when TelemetryClient is closed")
46+
elif not self._enabled:
47+
logger.debug("TelemetryClient disabled. Not sending logs.")
48+
return
49+
50+
async with self._lock:
51+
to_send = self._log_batch
52+
self._log_batch = []
53+
54+
if not to_send:
55+
logger.debug("Nothing to send to telemetry.")
56+
return
57+
58+
body = {"logs": [x.to_dict() for x in to_send]}
59+
logger.debug(
60+
"Sending %d logs to telemetry. Data is %s.",
61+
len(body),
62+
SecretDetector.mask_secrets(str(body))[1],
63+
)
64+
if ENABLE_TELEMETRY_LOG:
65+
# This logger guarantees the payload won't be masked. Testing purpose.
66+
rt_plain_logger.debug(f"Inband telemetry data being sent is {body}")
67+
try:
68+
ret = await self._rest.request(
69+
TelemetryClient.SF_PATH_TELEMETRY,
70+
body=body,
71+
method="post",
72+
client=None,
73+
timeout=5,
74+
)
75+
if not ret["success"]:
76+
logger.info(
77+
"Non-success response from telemetry server: %s. "
78+
"Disabling telemetry.",
79+
str(ret),
80+
)
81+
self._enabled = False
82+
else:
83+
logger.debug("Successfully uploading metrics to telemetry.")
84+
except Exception:
85+
self._enabled = False
86+
logger.debug("Failed to upload metrics to telemetry.", exc_info=True)
87+
88+
async def try_add_log_to_batch(self, telemetry_data: TelemetryData) -> None:
89+
try:
90+
await self.add_log_to_batch(telemetry_data)
91+
except Exception:
92+
logger.warning("Failed to add log to telemetry.", exc_info=True)
93+
94+
async def close(self, send_on_close: bool = True) -> None:
95+
if not self.is_closed:
96+
logger.debug("Closing telemetry client.")
97+
if send_on_close:
98+
await self.send_batch()
99+
self._rest = None

0 commit comments

Comments
 (0)