Skip to content

Commit 9f525c7

Browse files
SNOW-1570704: add open telemetry for cache_result (#2583)
1 parent 2b153da commit 9f525c7

File tree

3 files changed

+51
-32
lines changed

3 files changed

+51
-32
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
- When target stage is not set in profiler, a default stage from `Session.get_session_stage` is used instead of raising `SnowparkSQLException`.
1616
- Allowed lower case or mixed case input when calling `Session.stored_procedure_profiler.set_active_profiler`.
17+
- Added distributed tracing using open telemetry APIs for action function in `DataFrame`:
18+
- `cache_result`
1719

1820
#### Bug Fixes
1921

src/snowflake/snowpark/dataframe.py

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4057,42 +4057,43 @@ def cache_result(
40574057
A :class:`Table` object that holds the cached result in a temporary table.
40584058
All operations on this new DataFrame have no effect on the original.
40594059
"""
4060-
from snowflake.snowpark.mock._connection import MockServerConnection
4060+
with open_telemetry_context_manager(self.cache_result, self):
4061+
from snowflake.snowpark.mock._connection import MockServerConnection
40614062

4062-
temp_table_name = self._session.get_fully_qualified_name_if_possible(
4063-
f'"{random_name_for_temp_object(TempObjectType.TABLE)}"'
4064-
)
4063+
temp_table_name = self._session.get_fully_qualified_name_if_possible(
4064+
f'"{random_name_for_temp_object(TempObjectType.TABLE)}"'
4065+
)
40654066

4066-
if isinstance(self._session._conn, MockServerConnection):
4067-
self.write.save_as_table(temp_table_name, create_temp_table=True)
4068-
else:
4069-
df = self._with_plan(
4070-
SnowflakeCreateTable(
4071-
[temp_table_name],
4072-
None,
4073-
SaveMode.ERROR_IF_EXISTS,
4074-
self._plan,
4075-
creation_source=TableCreationSource.CACHE_RESULT,
4076-
table_type="temp",
4067+
if isinstance(self._session._conn, MockServerConnection):
4068+
self.write.save_as_table(temp_table_name, create_temp_table=True)
4069+
else:
4070+
df = self._with_plan(
4071+
SnowflakeCreateTable(
4072+
[temp_table_name],
4073+
None,
4074+
SaveMode.ERROR_IF_EXISTS,
4075+
self._plan,
4076+
creation_source=TableCreationSource.CACHE_RESULT,
4077+
table_type="temp",
4078+
)
40774079
)
4080+
statement_params_for_cache_result = {
4081+
**(statement_params or self._statement_params or {}),
4082+
"cache_result_temp_table": temp_table_name,
4083+
}
4084+
self._session._conn.execute(
4085+
df._plan,
4086+
_statement_params=create_or_update_statement_params_with_query_tag(
4087+
statement_params_for_cache_result,
4088+
self._session.query_tag,
4089+
SKIP_LEVELS_TWO,
4090+
),
4091+
)
4092+
cached_df = snowflake.snowpark.table.Table(
4093+
temp_table_name, self._session, is_temp_table_for_cleanup=True
40784094
)
4079-
statement_params_for_cache_result = {
4080-
**(statement_params or self._statement_params or {}),
4081-
"cache_result_temp_table": temp_table_name,
4082-
}
4083-
self._session._conn.execute(
4084-
df._plan,
4085-
_statement_params=create_or_update_statement_params_with_query_tag(
4086-
statement_params_for_cache_result,
4087-
self._session.query_tag,
4088-
SKIP_LEVELS_TWO,
4089-
),
4090-
)
4091-
cached_df = snowflake.snowpark.table.Table(
4092-
temp_table_name, self._session, is_temp_table_for_cleanup=True
4093-
)
4094-
cached_df.is_cached = True
4095-
return cached_df
4095+
cached_df.is_cached = True
4096+
return cached_df
40964097

40974098
@df_collect_api_telemetry
40984099
def random_split(

tests/integ/test_open_telemetry.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,3 +530,19 @@ def test_open_telemetry_span_from_dataframe_writer(session, dict_exporter):
530530
},
531531
)
532532
assert check_tracing_span_answers(span_extractor(dict_exporter), answer)
533+
534+
535+
def test_open_telemetry_from_cache_result(session, dict_exporter):
536+
df = session.sql("select 1").cache_result()
537+
lineno = inspect.currentframe().f_lineno - 1
538+
539+
answer = (
540+
"cache_result",
541+
{
542+
"code.filepath": "test_open_telemetry.py",
543+
"code.lineno": lineno,
544+
"method.chain": "DataFrame.cache_result()",
545+
},
546+
)
547+
assert check_tracing_span_answers(span_extractor(dict_exporter), answer)
548+
assert df.collect()[0][0] == 1

0 commit comments

Comments
 (0)