Skip to content

Commit 32cb970

Browse files
Adding telemetry to stored procs (#3563)
1 parent c153c70 commit 32cb970

File tree

3 files changed

+213
-7
lines changed

3 files changed

+213
-7
lines changed

src/snowflake/snowpark/_internal/telemetry.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import json
88
import threading
99
from enum import Enum, unique
10+
from logging import getLogger
1011
import time
1112
from typing import Any, Dict, List, Optional
1213

@@ -35,6 +36,7 @@
3536
get_version,
3637
is_in_stored_procedure,
3738
is_interactive,
39+
generate_random_alphanumeric,
3840
)
3941

4042
try:
@@ -44,6 +46,8 @@
4446
except ImportError:
4547
PS_UTIL_AVAILABLE = False
4648

49+
_logger = getLogger(__name__)
50+
4751

4852
@unique
4953
class TelemetryField(Enum):
@@ -474,13 +478,50 @@ def __init__(self, conn: SnowflakeConnection) -> None:
474478
self.python_version: str = get_python_version()
475479
self.os: str = get_os_name()
476480
self.is_interactive = is_interactive()
481+
self._enabled = True
482+
483+
# Initializing telemetry client for stored procedures
484+
# In stored procs, we can't import this package at the top level, so we need to do it here
485+
internal_metrics_available = None
486+
if is_in_stored_procedure():
487+
try:
488+
from _snowflake import internal_metrics
489+
490+
internal_metrics_available = True
491+
except ImportError:
492+
internal_metrics_available = False
493+
self.stored_proc_meter = (
494+
internal_metrics.get_meter("snowpark-python-client")
495+
if internal_metrics_available
496+
else None
497+
)
498+
# We periodically clean out the stored procedure meter of unused gauges
499+
self.clean_up_stored_proc_meter_interval = 1000
477500

478501
def send(self, msg: Dict, timestamp: Optional[int] = None):
502+
if not self._enabled:
503+
_logger.info("Telemetry client is disabled, skipping telemetry")
504+
return
505+
if not timestamp:
506+
timestamp = get_time_millis()
479507
if self.telemetry:
480-
if not timestamp:
481-
timestamp = get_time_millis()
482508
telemetry_data = PCTelemetryData(message=msg, timestamp=timestamp)
483509
self.telemetry.try_add_log_to_batch(telemetry_data)
510+
elif self.stored_proc_meter is not None:
511+
gauge_id = generate_random_alphanumeric(10)
512+
self.stored_proc_meter.create_gauge(
513+
f"snowflake.snowpark.client.gauge{gauge_id}",
514+
description=json.dumps(msg, ensure_ascii=False, separators=(",", ":")),
515+
unit="data",
516+
).set(
517+
200
518+
) # this is a dummy value
519+
if (
520+
len(self.stored_proc_meter._instrument_id_instrument)
521+
>= self.clean_up_stored_proc_meter_interval
522+
):
523+
with self.stored_proc_meter._instrument_id_instrument_lock:
524+
self.stored_proc_meter._instrument_id_instrument.clear()
484525

485526
def _create_basic_telemetry_data(self, telemetry_type: str) -> Dict[str, Any]:
486527
message = {

src/snowflake/snowpark/session.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@
298298
_PYTHON_SNOWPARK_GENERATE_MULTILINE_QUERIES = (
299299
"PYTHON_SNOWPARK_GENERATE_MULTILINE_QUERIES"
300300
)
301+
_PYTHON_SNOWPARK_INTERNAL_TELEMETRY_ENABLED = "ENABLE_SNOWPARK_FIRST_PARTY_TELEMETRY"
301302

302303
# AST encoding.
303304
_PYTHON_SNOWPARK_USE_AST = "PYTHON_SNOWPARK_USE_AST"
@@ -675,6 +676,12 @@ def __init__(
675676
else:
676677
self._disable_multiline_queries()
677678

679+
self._internal_telemetry_enabled: bool = (
680+
self._conn._get_client_side_session_parameter(
681+
_PYTHON_SNOWPARK_INTERNAL_TELEMETRY_ENABLED, False
682+
)
683+
)
684+
678685
self._large_query_breakdown_enabled: bool = self.is_feature_enabled_for_version(
679686
_PYTHON_SNOWPARK_USE_LARGE_QUERY_BREAKDOWN_OPTIMIZATION_VERSION
680687
)
@@ -4083,17 +4090,21 @@ def telemetry_enabled(self) -> bool:
40834090
>>> session.telemetry_enabled
40844091
True
40854092
"""
4086-
return self._conn._conn.telemetry_enabled
4093+
return self._conn._telemetry_client._enabled
40874094

40884095
@telemetry_enabled.setter
40894096
def telemetry_enabled(self, value):
40904097
# Set both in-band and out-of-band telemetry to True/False
40914098
if value:
4092-
self._conn._conn.telemetry_enabled = True
4093-
self._conn._telemetry_client.telemetry._enabled = True
4099+
self._conn._telemetry_client._enabled = True
4100+
if is_in_stored_procedure() and not self._stored_proc_telemetry_enabled:
4101+
_logger.debug(
4102+
"Client side parameter ENABLE_SNOWPARK_FIRST_PARTY_TELEMETRY is set to False, telemetry could not be enabled"
4103+
)
4104+
self._conn._telemetry_client._enabled = False
4105+
40944106
else:
4095-
self._conn._conn.telemetry_enabled = False
4096-
self._conn._telemetry_client.telemetry._enabled = False
4107+
self._conn._telemetry_client._enabled = False
40974108

40984109
@property
40994110
def file(self) -> FileOperation:
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
4+
#
5+
6+
import threading
7+
from unittest.mock import patch, MagicMock
8+
from typing import Dict
9+
import json
10+
11+
from snowflake.snowpark._internal.telemetry import TelemetryClient
12+
from snowflake.snowpark import Session
13+
14+
15+
class MockGauge:
16+
def __init__(self, name: str, unit: str = "", description: str = "") -> None:
17+
self.name = name
18+
self.unit = unit
19+
self.description = description
20+
self._value = None
21+
22+
def set(self, value):
23+
self._value = value
24+
return self
25+
26+
def get_value(self):
27+
return self._value
28+
29+
30+
class MockMeter:
31+
def __init__(self) -> None:
32+
self._instrument_id_instrument: Dict[str, MockGauge] = {}
33+
self._instrument_id_instrument_lock = threading.Lock()
34+
35+
def create_gauge(self, name: str, unit: str = "", description: str = ""):
36+
instrument = MockGauge(
37+
name,
38+
unit,
39+
description,
40+
)
41+
instrument_id = f"{name},{unit},{description}"
42+
43+
with self._instrument_id_instrument_lock:
44+
self._instrument_id_instrument[instrument_id] = instrument
45+
return instrument
46+
47+
48+
@patch("snowflake.snowpark._internal.telemetry.is_in_stored_procedure")
49+
def test_telemetry_client_with_mock_meter(mock_is_in_stored_proc):
50+
mock_is_in_stored_proc.return_value = True
51+
mock_meter = MockMeter()
52+
mock_conn = MagicMock()
53+
client = TelemetryClient(mock_conn)
54+
client._enabled = True
55+
client.telemetry = None
56+
client.stored_proc_meter = mock_meter
57+
client.clean_up_stored_proc_meter_interval = 200
58+
test_message = {"test": "data", "func_name": "test_function"}
59+
client.send(test_message)
60+
assert len(mock_meter._instrument_id_instrument) == 1
61+
gauge = list(mock_meter._instrument_id_instrument.values())[0]
62+
assert "snowflake.snowpark.client.gauge" in gauge.name
63+
assert gauge.unit == "data"
64+
assert gauge.description == json.dumps(
65+
test_message, ensure_ascii=False, separators=(",", ":")
66+
)
67+
assert gauge.get_value() == 200
68+
69+
70+
@patch("snowflake.snowpark._internal.telemetry.is_in_stored_procedure")
71+
def test_telemetry_client_multiple_sends(mock_is_in_stored_proc):
72+
mock_is_in_stored_proc.return_value = True
73+
mock_meter = MockMeter()
74+
mock_conn = MagicMock()
75+
client = TelemetryClient(mock_conn)
76+
client._enabled = True
77+
client.telemetry = None
78+
client.stored_proc_meter = mock_meter
79+
client.clean_up_stored_proc_meter_interval = 200
80+
for i in range(100):
81+
client.send({"test": f"data_{i}"})
82+
assert len(mock_meter._instrument_id_instrument) == 100
83+
gauges = list(mock_meter._instrument_id_instrument.values())
84+
for gauge in gauges:
85+
assert "snowflake.snowpark.client.gauge" in gauge.name
86+
assert gauge.get_value() == 200
87+
88+
89+
@patch("snowflake.snowpark._internal.telemetry.is_in_stored_procedure")
90+
def test_telemetry_client_cleanup(mock_is_in_stored_proc):
91+
"""Test TelemetryClient cleanup mechanism"""
92+
mock_is_in_stored_proc.return_value = True
93+
mock_meter = MockMeter()
94+
mock_conn = MagicMock()
95+
client = TelemetryClient(mock_conn)
96+
client._enabled = True
97+
client.telemetry = None
98+
client.stored_proc_meter = mock_meter
99+
client.clean_up_stored_proc_meter_interval = 100
100+
for i in range(client.clean_up_stored_proc_meter_interval):
101+
client.send({"test": f"data_{i}"})
102+
assert len(mock_meter._instrument_id_instrument) == 0
103+
client.send({"test": "data_after_cleanup"})
104+
assert len(mock_meter._instrument_id_instrument) == 1
105+
106+
107+
@patch("snowflake.snowpark.session.is_in_stored_procedure")
108+
@patch("snowflake.snowpark.session._logger")
109+
def test_internal_telemetry_disabled(mock_logger, mock_is_in_stored_proc):
110+
"""Test that Session logs debug message when telemetry is enabled but stored proc telemetry is disabled"""
111+
mock_is_in_stored_proc.return_value = True
112+
mock_session = MagicMock(spec=Session)
113+
mock_session._stored_proc_telemetry_enabled = False
114+
mock_session._conn = MagicMock()
115+
mock_session._conn._conn = MagicMock()
116+
mock_session._conn._telemetry_client = MagicMock()
117+
Session.telemetry_enabled.fset(mock_session, False)
118+
assert mock_session._conn._telemetry_client._enabled is False
119+
Session.telemetry_enabled.fset(mock_session, True)
120+
mock_logger.debug.assert_called_once_with(
121+
"Client side parameter ENABLE_SNOWPARK_FIRST_PARTY_TELEMETRY is set to False, telemetry could not be enabled"
122+
)
123+
124+
125+
@patch("snowflake.snowpark._internal.telemetry.is_in_stored_procedure")
126+
def test_telemetry_client_internal_metrics_import_fails(mock_is_in_stored_proc):
127+
"""Test TelemetryClient handles ImportError when _snowflake.internal_metrics import fails"""
128+
mock_is_in_stored_proc.return_value = True
129+
mock_conn = MagicMock()
130+
with patch(
131+
"builtins.__import__", side_effect=ImportError("No module named '_snowflake'")
132+
):
133+
client = TelemetryClient(mock_conn)
134+
assert client is not None
135+
assert client._enabled is True
136+
assert client.stored_proc_meter is None
137+
test_message = {"test": "data", "func_name": "test_function"}
138+
client.send(test_message)
139+
140+
141+
@patch("snowflake.snowpark._internal.telemetry.is_in_stored_procedure")
142+
def test_telemetry_client_disabled(mock_is_in_stored_proc):
143+
"""Test that no message is sent when telemetry client is not enabled"""
144+
mock_is_in_stored_proc.return_value = True
145+
mock_meter = MockMeter()
146+
mock_conn = MagicMock()
147+
client = TelemetryClient(mock_conn)
148+
client._enabled = False
149+
client.telemetry = None
150+
client.stored_proc_meter = mock_meter
151+
client.clean_up_stored_proc_meter_interval = 200
152+
test_message = {"test": "data", "func_name": "test_function"}
153+
client.send(test_message)
154+
assert len(mock_meter._instrument_id_instrument) == 0

0 commit comments

Comments
 (0)