Skip to content

Commit 5d00adf

Browse files
Adding describe query time to profiler (#3644)
1 parent 3945028 commit 5d00adf

File tree

14 files changed

+330
-147
lines changed

14 files changed

+330
-147
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
- `xpath_long`
1717
- `xpath_short`
1818
- Added support for parameter `use_vectorized_scanner` in function `Session.write_arrow()`.
19+
- Dataframe profiler adds the following information about each query: describe query time, execution time, and sql query text. To view this information, call session.dataframe_profiler.enable() and call get_execution_profile on a dataframe.
1920

2021
#### Bug Fixes
2122

src/snowflake/snowpark/_internal/analyzer/schema_utils.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
#
22
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
33
#
4-
import time
54
import traceback
6-
from typing import TYPE_CHECKING, List, Union
5+
from typing import TYPE_CHECKING, List, Union, Optional
76

87
import snowflake.snowpark
98
from snowflake.connector.cursor import ResultMetadata, SnowflakeCursor
@@ -12,7 +11,7 @@
1211
)
1312
from snowflake.snowpark._internal.analyzer.expression import Attribute
1413
from snowflake.snowpark._internal.type_utils import convert_metadata_to_sp_type
15-
from snowflake.snowpark._internal.utils import ttl_cache
14+
from snowflake.snowpark._internal.utils import ttl_cache, measure_time
1615
from snowflake.snowpark.types import DecimalType, LongType, StringType
1716

1817
if TYPE_CHECKING:
@@ -68,7 +67,9 @@ def get_attributes() -> List[Attribute]:
6867

6968

7069
def analyze_attributes(
71-
sql: str, session: "snowflake.snowpark.session.Session"
70+
sql: str,
71+
session: "snowflake.snowpark.session.Session",
72+
dataframe_uuid: Optional[str] = None,
7273
) -> List[Attribute]:
7374
lowercase = sql.strip().lower()
7475

@@ -87,29 +88,39 @@ def analyze_attributes(
8788
if lowercase.startswith("get"):
8889
return get_attributes()
8990
if lowercase.startswith("describe"):
90-
session._run_query(sql)
91+
with measure_time() as e2e_time:
92+
session._run_query(sql)
93+
# Add the time taken to describe the dataframe to query history
94+
if dataframe_uuid:
95+
session.dataframe_profiler.add_describe_query_time(
96+
dataframe_uuid, sql, e2e_time()
97+
)
98+
9199
return convert_result_meta_to_attribute(
92100
session._conn._cursor.description, session._conn.max_string_size
93101
)
94102

95-
# collect describe query details for telemetry
103+
# collect describe query details for telemetry and dataframe profiling
96104
stack = traceback.extract_stack(limit=10)[:-1]
97105
stack_trace = [frame.line for frame in stack] if len(stack) > 0 else None
98-
start_time = time.time()
99-
attributes = session._get_result_attributes(sql)
100-
e2e_time = time.time() - start_time
106+
with measure_time() as e2e_time:
107+
attributes = session._get_result_attributes(sql)
101108
session._conn._telemetry_client.send_describe_query_details(
102-
session._session_id, sql, e2e_time, stack_trace
109+
session._session_id, sql, e2e_time(), stack_trace
103110
)
111+
if dataframe_uuid:
112+
session.dataframe_profiler.add_describe_query_time(
113+
dataframe_uuid, sql, e2e_time()
114+
)
104115

105116
return attributes
106117

107118

108119
@ttl_cache(ttl_seconds=15)
109120
def cached_analyze_attributes(
110-
sql: str, session: "snowflake.snowpark.session.Session" # type: ignore
121+
sql: str, session: "snowflake.snowpark.session.Session", dataframe_uuid: Optional[str] = None # type: ignore
111122
) -> List[Attribute]:
112-
return analyze_attributes(sql, session)
123+
return analyze_attributes(sql, session, dataframe_uuid)
113124

114125

115126
def convert_result_meta_to_attribute(

src/snowflake/snowpark/_internal/analyzer/select_statement.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -642,9 +642,9 @@ def children_plan_nodes(self) -> List[Union["Selectable", SnowflakePlan]]:
642642

643643
@SnowflakePlan.Decorator.wrap_exception
644644
def _analyze_attributes(
645-
sql: str, session: "snowflake.snowpark.session.Session" # type: ignore
645+
sql: str, session: "snowflake.snowpark.session.Session", dataframe_uuid: Optional[str] = None # type: ignore
646646
) -> List[Attribute]:
647-
return analyze_attributes(sql, session)
647+
return analyze_attributes(sql, session, dataframe_uuid)
648648

649649

650650
class SelectSQL(Selectable):
@@ -677,7 +677,7 @@ def __init__(
677677
self.pre_actions[0].query_id_place_holder
678678
)
679679
self._schema_query = analyzer_utils.schema_value_statement(
680-
_analyze_attributes(sql, self._session)
680+
_analyze_attributes(sql, self._session, self._uuid)
681681
) # Change to subqueryable schema query so downstream query plan can describe the SQL
682682
self._query_param = None
683683
else:

src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,9 +588,9 @@ def _analyze_attributes(self) -> List[Attribute]:
588588
self.schema_query is not None
589589
), "No schema query is available for the SnowflakePlan"
590590
if self.session.reduce_describe_query_enabled:
591-
return cached_analyze_attributes(self.schema_query, self.session)
591+
return cached_analyze_attributes(self.schema_query, self.session, self.uuid)
592592
else:
593-
return analyze_attributes(self.schema_query, self.session)
593+
return analyze_attributes(self.schema_query, self.session, self.uuid)
594594

595595
@property
596596
def attributes(self) -> List[Attribute]:
@@ -725,6 +725,8 @@ def set_last_query_line_intervals(self) -> None:
725725
self.uuid,
726726
)
727727
final_sql = remove_comments(last_query.sql, child_uuids)
728+
if self.schema_query:
729+
self.schema_query = remove_comments(self.schema_query, child_uuids)
728730
last_query.sql = final_sql
729731
last_query.query_line_intervals = query_line_intervals
730732

src/snowflake/snowpark/_internal/compiler/plan_compiler.py

Lines changed: 64 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import copy
66
import logging
7-
import time
87
from typing import Any, Dict, List
98

109
from snowflake.snowpark._internal.analyzer.query_plan_analysis_utils import (
@@ -30,7 +29,7 @@
3029
plot_plan_if_enabled,
3130
)
3231
from snowflake.snowpark._internal.telemetry import TelemetryField
33-
from snowflake.snowpark._internal.utils import random_name_for_temp_object
32+
from snowflake.snowpark._internal.utils import measure_time, random_name_for_temp_object
3433
from snowflake.snowpark.mock._connection import MockServerConnection
3534

3635
_logger = logging.getLogger(__name__)
@@ -90,79 +89,75 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
9089
if self.should_start_query_compilation():
9190
session = self._plan.session
9291
try:
93-
# preparation for compilation
94-
# 1. make a copy of the original plan
95-
start_time = time.time()
96-
complexity_score_before_compilation = get_complexity_score(self._plan)
97-
logical_plans: List[LogicalPlan] = [copy.deepcopy(self._plan)]
98-
plot_plan_if_enabled(self._plan, "original_plan")
99-
plot_plan_if_enabled(logical_plans[0], "deep_copied_plan")
100-
deep_copy_end_time = time.time()
101-
102-
# 2. create a code generator with the original plan
103-
query_generator = create_query_generator(self._plan)
104-
105-
extra_optimization_status: Dict[str, Any] = {}
106-
# 3. apply each optimizations if needed
107-
# CTE optimization
108-
cte_start_time = time.time()
109-
if session.cte_optimization_enabled:
110-
repeated_subquery_eliminator = RepeatedSubqueryElimination(
111-
logical_plans, query_generator
112-
)
113-
elimination_result = repeated_subquery_eliminator.apply()
114-
logical_plans = elimination_result.logical_plans
115-
# add the extra repeated subquery elimination status
116-
extra_optimization_status[
117-
CompilationStageTelemetryField.CTE_NODE_CREATED.value
118-
] = elimination_result.total_num_of_ctes
119-
120-
cte_end_time = time.time()
121-
complexity_scores_after_cte = [
122-
get_complexity_score(logical_plan) for logical_plan in logical_plans
123-
]
124-
for i, plan in enumerate(logical_plans):
125-
plot_plan_if_enabled(plan, f"cte_optimized_plan_{i}")
126-
127-
# Large query breakdown
128-
breakdown_summary, skipped_summary = {}, {}
129-
if session.large_query_breakdown_enabled:
130-
large_query_breakdown = LargeQueryBreakdown(
131-
session,
132-
query_generator,
133-
logical_plans,
134-
session.large_query_breakdown_complexity_bounds,
135-
)
136-
breakdown_result = large_query_breakdown.apply()
137-
logical_plans = breakdown_result.logical_plans
138-
breakdown_summary = breakdown_result.breakdown_summary
139-
skipped_summary = breakdown_result.skipped_summary
140-
141-
large_query_breakdown_end_time = time.time()
142-
complexity_scores_after_large_query_breakdown = [
143-
get_complexity_score(logical_plan) for logical_plan in logical_plans
144-
]
145-
for i, plan in enumerate(logical_plans):
146-
plot_plan_if_enabled(plan, f"large_query_breakdown_plan_{i}")
147-
148-
# 4. do a final pass of code generation
149-
queries = query_generator.generate_queries(logical_plans)
92+
with measure_time() as total_time:
93+
# preparation for compilation
94+
# 1. make a copy of the original plan
95+
with measure_time() as deep_copy_time:
96+
complexity_score_before_compilation = get_complexity_score(
97+
self._plan
98+
)
99+
logical_plans: List[LogicalPlan] = [copy.deepcopy(self._plan)]
100+
plot_plan_if_enabled(self._plan, "original_plan")
101+
plot_plan_if_enabled(logical_plans[0], "deep_copied_plan")
102+
103+
# 2. create a code generator with the original plan
104+
query_generator = create_query_generator(self._plan)
105+
106+
extra_optimization_status: Dict[str, Any] = {}
107+
# 3. apply each optimizations if needed
108+
# CTE optimization
109+
with measure_time() as cte_time:
110+
if session.cte_optimization_enabled:
111+
repeated_subquery_eliminator = RepeatedSubqueryElimination(
112+
logical_plans, query_generator
113+
)
114+
elimination_result = repeated_subquery_eliminator.apply()
115+
logical_plans = elimination_result.logical_plans
116+
# add the extra repeated subquery elimination status
117+
extra_optimization_status[
118+
CompilationStageTelemetryField.CTE_NODE_CREATED.value
119+
] = elimination_result.total_num_of_ctes
120+
complexity_scores_after_cte = [
121+
get_complexity_score(logical_plan)
122+
for logical_plan in logical_plans
123+
]
124+
for i, plan in enumerate(logical_plans):
125+
plot_plan_if_enabled(plan, f"cte_optimized_plan_{i}")
126+
127+
# Large query breakdown
128+
breakdown_summary, skipped_summary = {}, {}
129+
with measure_time() as large_query_breakdown_time:
130+
if session.large_query_breakdown_enabled:
131+
large_query_breakdown = LargeQueryBreakdown(
132+
session,
133+
query_generator,
134+
logical_plans,
135+
session.large_query_breakdown_complexity_bounds,
136+
)
137+
breakdown_result = large_query_breakdown.apply()
138+
logical_plans = breakdown_result.logical_plans
139+
breakdown_summary = breakdown_result.breakdown_summary
140+
skipped_summary = breakdown_result.skipped_summary
141+
142+
complexity_scores_after_large_query_breakdown = [
143+
get_complexity_score(logical_plan)
144+
for logical_plan in logical_plans
145+
]
146+
for i, plan in enumerate(logical_plans):
147+
plot_plan_if_enabled(plan, f"large_query_breakdown_plan_{i}")
148+
149+
# 4. do a final pass of code generation
150+
queries = query_generator.generate_queries(logical_plans)
150151

151152
# log telemetry data
152-
deep_copy_time = deep_copy_end_time - start_time
153-
cte_time = cte_end_time - cte_start_time
154-
large_query_breakdown_time = (
155-
large_query_breakdown_end_time - cte_end_time
156-
)
157-
total_time = time.time() - start_time
158153
summary_value = {
159154
TelemetryField.CTE_OPTIMIZATION_ENABLED.value: session.cte_optimization_enabled,
160155
TelemetryField.LARGE_QUERY_BREAKDOWN_ENABLED.value: session.large_query_breakdown_enabled,
161156
CompilationStageTelemetryField.COMPLEXITY_SCORE_BOUNDS.value: session.large_query_breakdown_complexity_bounds,
162-
CompilationStageTelemetryField.TIME_TAKEN_FOR_COMPILATION.value: total_time,
163-
CompilationStageTelemetryField.TIME_TAKEN_FOR_DEEP_COPY_PLAN.value: deep_copy_time,
164-
CompilationStageTelemetryField.TIME_TAKEN_FOR_CTE_OPTIMIZATION.value: cte_time,
165-
CompilationStageTelemetryField.TIME_TAKEN_FOR_LARGE_QUERY_BREAKDOWN.value: large_query_breakdown_time,
157+
CompilationStageTelemetryField.TIME_TAKEN_FOR_COMPILATION.value: total_time(),
158+
CompilationStageTelemetryField.TIME_TAKEN_FOR_DEEP_COPY_PLAN.value: deep_copy_time(),
159+
CompilationStageTelemetryField.TIME_TAKEN_FOR_CTE_OPTIMIZATION.value: cte_time(),
160+
CompilationStageTelemetryField.TIME_TAKEN_FOR_LARGE_QUERY_BREAKDOWN.value: large_query_breakdown_time(),
166161
CompilationStageTelemetryField.COMPLEXITY_SCORE_BEFORE_COMPILATION.value: complexity_score_before_compilation,
167162
CompilationStageTelemetryField.COMPLEXITY_SCORE_AFTER_CTE_OPTIMIZATION.value: complexity_scores_after_cte,
168163
CompilationStageTelemetryField.COMPLEXITY_SCORE_AFTER_LARGE_QUERY_BREAKDOWN.value: complexity_scores_after_large_query_breakdown,

src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
33
#
44
from enum import Enum
5-
import time
65
import datetime
76
from typing import List, Callable, Any, Optional, TYPE_CHECKING
87
from snowflake.connector.options import pandas as pd
@@ -12,8 +11,11 @@
1211
Connection,
1312
Cursor,
1413
)
15-
from snowflake.snowpark._internal.utils import generate_random_alphanumeric
16-
from snowflake.snowpark._internal.utils import get_sorted_key_for_version
14+
from snowflake.snowpark._internal.utils import (
15+
generate_random_alphanumeric,
16+
get_sorted_key_for_version,
17+
measure_time,
18+
)
1719
from snowflake.snowpark.exceptions import SnowparkDataframeReaderException
1820
from snowflake.snowpark.types import (
1921
StructType,
@@ -141,21 +143,21 @@ def udtf_ingestion(
141143
from snowflake.snowpark._internal.data_source.utils import UDTF_PACKAGE_MAP
142144

143145
udtf_name = f"data_source_udtf_{generate_random_alphanumeric(5)}"
144-
start = time.perf_counter()
145-
session.udtf.register(
146-
self.udtf_class_builder(fetch_size=fetch_size, schema=schema),
147-
name=udtf_name,
148-
output_schema=StructType(
149-
[
150-
StructField(field.name, VariantType(), field.nullable)
151-
for field in schema.fields
152-
]
153-
),
154-
external_access_integrations=[external_access_integrations],
155-
packages=packages or UDTF_PACKAGE_MAP.get(self.dbms_type),
156-
imports=imports,
157-
)
158-
logger.debug(f"register ingestion udtf takes: {time.time() - start} seconds")
146+
with measure_time() as udtf_register_time:
147+
session.udtf.register(
148+
self.udtf_class_builder(fetch_size=fetch_size, schema=schema),
149+
name=udtf_name,
150+
output_schema=StructType(
151+
[
152+
StructField(field.name, VariantType(), field.nullable)
153+
for field in schema.fields
154+
]
155+
),
156+
external_access_integrations=[external_access_integrations],
157+
packages=packages or UDTF_PACKAGE_MAP.get(self.dbms_type),
158+
imports=imports,
159+
)
160+
logger.debug(f"register ingestion udtf takes: {udtf_register_time()} seconds")
159161
call_udtf_sql = f"""
160162
select * from {partition_table}, table({udtf_name}({PARTITION_TABLE_COLUMN_NAME}))
161163
"""

0 commit comments

Comments
 (0)