Skip to content

Commit c5b9940

Browse files
Merge branch 'main' into helmeleegy-SNOW-2432713
2 parents fd01458 + 5ac5b71 commit c5b9940

File tree

13 files changed

+720
-583
lines changed

13 files changed

+720
-583
lines changed

CHANGELOG.md

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Release History
22

3+
## 1.42.0 (YYYY-MM-DD)
4+
5+
### Snowpark Python API Updates
6+
7+
#### New Features
8+
9+
- Added support for `Session.udf_profiler`.
10+
311
## 1.41.0 (YYYY-MM-DD)
412

513
### Snowpark Python API Updates
@@ -95,16 +103,6 @@
95103
- `skew()` with `axis=1` or `numeric_only=False` parameters
96104
- `round()` with `decimals` parameter as a Series
97105
- `corr()` with `method!=pearson` parameter
98-
- `shift()` with `suffix` or non-integer `periods` parameters
99-
- `sort_index()` with `axis=1` or `key` parameters
100-
- `sort_values()` with `axis=1`
101-
- `melt()` with `col_level` parameter
102-
- `apply()` with `result_type` parameter for DataFrame
103-
- `pivot_table()` with `sort=True`, non-string `index` list, non-string `columns` list, non-string `values` list, or `aggfunc` dict with non-string values
104-
- `fillna()` with `downcast` parameter or using `limit` together with `value`
105-
- `dropna()` with `axis=1`
106-
107-
108106
- Set `cte_optimization_enabled` to True for all Snowpark pandas sessions.
109107
- Add support for the following in faster pandas:
110108
- `isin`
@@ -166,7 +164,6 @@
166164
- `groupby.var`
167165
- `groupby.nunique`
168166
- `groupby.size`
169-
- `groupby.apply`
170167
- `drop_duplicates`
171168
- Reuse row count from the relaxed query compiler in `get_axis_len`.
172169

src/snowflake/snowpark/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
"QueryListener",
4141
"AsyncJob",
4242
"StoredProcedureProfiler",
43+
"UDFProfiler",
4344
]
4445

4546

@@ -54,6 +55,7 @@
5455
from snowflake.snowpark.async_job import AsyncJob
5556
from snowflake.snowpark.column import CaseExpr, Column
5657
from snowflake.snowpark.stored_procedure_profiler import StoredProcedureProfiler
58+
from snowflake.snowpark.udf_profiler import UDFProfiler
5759
from snowflake.snowpark.dataframe import DataFrame
5860
from snowflake.snowpark.dataframe_ai_functions import DataFrameAIFunctions
5961
from snowflake.snowpark.dataframe_analytics_functions import DataFrameAnalyticsFunctions
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#
2+
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
3+
#
4+
5+
import logging
6+
import threading
7+
from typing import List, Literal, Optional
8+
9+
import snowflake.snowpark
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class SnowparkProfiler:
15+
"""
16+
Base class for stored procedure profiler and UDF profiler
17+
"""
18+
19+
def __init__(
20+
self,
21+
session: "snowflake.snowpark.Session",
22+
) -> None:
23+
self._session = session
24+
self._query_history = None
25+
self._lock = threading.RLock()
26+
self._active_profiler_number = 0
27+
self._has_target_stage = False
28+
self._is_enabled = False
29+
30+
self._active_profiler_name = "ACTIVE_PYTHON_PROFILER"
31+
self._output_sql = ""
32+
self._profiler_module_name = ""
33+
34+
def register_modules(self, modules: Optional[List[str]] = None) -> None:
35+
"""
36+
Register modules to generate profiles for them.
37+
38+
Args:
39+
modules: List of names of stored procedures. Registered modules will be overwritten by this input.
40+
Input None or an empty list will remove registered modules.
41+
"""
42+
module_string = ",".join(modules) if modules is not None else ""
43+
sql_statement = (
44+
f"alter session set {self._profiler_module_name}='{module_string}'"
45+
)
46+
self._session.sql(sql_statement)._internal_collect_with_tag_no_telemetry()
47+
48+
def set_active_profiler(
49+
self, active_profiler_type: Literal["LINE", "MEMORY"] = "LINE"
50+
) -> None:
51+
"""
52+
Set active profiler.
53+
54+
Args:
55+
active_profiler_type: String that represent active_profiler, must be either 'LINE' or 'MEMORY'
56+
(case-insensitive). Active profiler is 'LINE' by default.
57+
58+
"""
59+
if active_profiler_type.upper() not in ["LINE", "MEMORY"]:
60+
raise ValueError(
61+
f"active_profiler expect 'LINE', 'MEMORY', got {active_profiler_type} instead"
62+
)
63+
sql_statement = f"alter session set {self._active_profiler_name} = '{active_profiler_type.upper()}'"
64+
try:
65+
self._session.sql(sql_statement)._internal_collect_with_tag_no_telemetry()
66+
except Exception as e:
67+
logger.warning(
68+
f"Set active profiler failed because of {e}. Active profiler is previously set value or default 'LINE' now."
69+
)
70+
with self._lock:
71+
self._active_profiler_number += 1
72+
if self._query_history is None:
73+
self._query_history = self._session.query_history(
74+
include_thread_id=True, include_error=True
75+
)
76+
self._is_enabled = True
77+
78+
def disable(self) -> None:
79+
"""
80+
Disable profiler.
81+
"""
82+
with self._lock:
83+
self._active_profiler_number -= 1
84+
if self._active_profiler_number == 0:
85+
self._session._conn.remove_query_listener(self._query_history) # type: ignore
86+
self._query_history = None
87+
self._is_enabled = False
88+
sql_statement = f"alter session set {self._active_profiler_name} = ''"
89+
self._session.sql(sql_statement)._internal_collect_with_tag_no_telemetry()
90+
91+
@staticmethod
92+
def _is_procedure_or_function_call(query: str) -> bool:
93+
pass
94+
95+
def _get_last_query_id(self) -> Optional[str]:
96+
current_thread = threading.get_ident()
97+
for query in self._query_history.queries[::-1]: # type: ignore
98+
query_thread = getattr(query, "thread_id", None)
99+
if query_thread == current_thread and self._is_procedure_or_function_call(
100+
query.sql_text
101+
):
102+
return query.query_id
103+
return None
104+
105+
def get_output(self) -> str:
106+
"""
107+
Return the profiles of last executed stored procedure or UDF in current thread. If there is no previous
108+
stored procedure or UDF call, an error will be raised.
109+
110+
Note:
111+
Please call this function right after the stored procedure or UDF you want to profile to avoid any error.
112+
113+
"""
114+
# return empty string when profiler is not enabled to not interrupt user's code
115+
if not self._is_enabled:
116+
logger.warning(
117+
"You are seeing this warning because you try to get profiler output while profiler is disabled. Please use profiler.set_active_profiler() to enable profiler."
118+
)
119+
return ""
120+
query_id = self._get_last_query_id()
121+
if query_id is None:
122+
logger.warning(
123+
"You are seeing this warning because last executed stored procedure or UDF does not exist. Please run the store procedure or UDF before get profiler output."
124+
)
125+
return ""
126+
sql = self._output_sql.format(query_id=query_id)
127+
return self._session.sql(sql)._internal_collect_with_tag_no_telemetry()[0][0] # type: ignore

0 commit comments

Comments
 (0)