Skip to content

Commit 67964ff

Browse files
authored
Merge branch 'main' into aalam-SNOW-2257191-cte-join-bugfix
2 parents 7209d87 + c5f1816 commit 67964ff

29 files changed

+969
-62
lines changed

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,15 @@
1010
- Added support for PrPr feature `Session.client_telemetry`.
1111
- Added support for `Session.udf_profiler`.
1212
- Added support for `functions.ai_translate`.
13+
- Added support for the following `iceberg_config` options in `DataFrameWriter.save_as_table` and `DataFrame.copy_into_table`:
14+
- `target_file_size`
15+
- `partition_by`
1316
- Added support for the following functions in `functions.py`:
1417
- String and Binary functions:
1518
- `base64_decode_binary`
19+
- `bucket`
1620
- `compress`
21+
- `day`
1722
- `decompress_binary`
1823
- `decompress_string`
1924
- `md5_binary`
@@ -23,6 +28,7 @@
2328
- `sha2_binary`
2429
- `soundex_p123`
2530
- `strtok`
31+
- `truncate`
2632
- `try_base64_decode_binary`
2733
- `try_base64_decode_string`
2834
- `try_hex_decode_binary`
@@ -43,6 +49,10 @@
4349
- `square`
4450
- `width_bucket`
4551

52+
#### Bug Fixes
53+
54+
- Fixed a bug where automatically-generated temporary objects were not properly cleaned up.
55+
4656
#### Improvements
4757

4858
- Enhanced `DataFrame.sort()` to support `ORDER BY ALL` when no columns are specified.
@@ -56,6 +66,7 @@
5666
#### Bug Fixes
5767

5868
- Fixed with a bug when sql generation when joining two `DataFrame`s created using `DataFrame.alias` and CTE optimization is enabled.
69+
- Fixed a bug in `XMLReader` where finding the start position of a row tag could return an incorrect file position.
5970

6071
### Snowpark pandas API Updates
6172

@@ -64,6 +75,9 @@
6475
- Added support for `Dataframe.groupby.rolling()`.
6576
- Added support for mapping `np.percentile` with DataFrame and Series inputs to `Series.quantile`.
6677
- Added support for setting the `random_state` parameter to an integer when calling `DataFrame.sample` or `Series.sample`.
78+
- Added support for the following `iceberg_config` options in `to_iceberg`:
79+
- `target_file_size`
80+
- `partition_by`
6781

6882
#### Improvements
6983

docs/source/snowpark/functions.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ Functions
120120
boolor
121121
boolxor
122122
boolxor_agg
123+
bucket
123124
build_stage_file_url
124125
builtin
125126
bround
@@ -189,6 +190,7 @@ Functions
189190
datediff
190191
date_add
191192
date_sub
193+
day
192194
daydiff
193195
dayname
194196
dayofmonth
@@ -555,6 +557,7 @@ Functions
555557
translate
556558
trim
557559
trunc
560+
truncate
558561
try_cast
559562
try_parse_json
560563
try_to_binary

src/snowflake/snowpark/_internal/analyzer/analyzer.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#
55
import uuid
66
from collections import Counter, defaultdict
7-
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Union
7+
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional, Union
88
from logging import getLogger
99

1010
from snowflake.connector import IntegrityError
@@ -177,6 +177,7 @@
177177
ExprAliasUpdateDict,
178178
)
179179
from snowflake.snowpark.types import BooleanType, _NumericType
180+
from snowflake.snowpark.column import Column
180181

181182
ARRAY_BIND_THRESHOLD = 512
182183

@@ -904,6 +905,43 @@ def to_sql_try_avoid_cast(
904905
parse_local_name,
905906
)
906907

908+
def _process_partition_by_in_iceberg_config(
909+
self,
910+
iceberg_config: Optional[dict],
911+
df_aliased_col_name_to_real_col_name: Union[
912+
DefaultDict[str, Dict[str, str]], DefaultDict[str, ExprAliasUpdateDict]
913+
],
914+
) -> Optional[dict]:
915+
"""
916+
Process partition_by expressions from iceberg_config, converting Column objects to SQL strings.
917+
Returns a new iceberg_config dict with partition_by as a list of SQL strings, or the original config if no processing needed.
918+
"""
919+
if iceberg_config is None or iceberg_config.get("partition_by") is None:
920+
return iceberg_config
921+
922+
iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
923+
pb = iceberg_config["partition_by"]
924+
925+
# Convert to list and filter out empty expressions
926+
partition_exprs = pb if isinstance(pb, (list, tuple)) else [pb]
927+
partition_sqls = []
928+
for expr in partition_exprs:
929+
if isinstance(expr, Column):
930+
partition_sqls.append(
931+
self.analyze(expr._expression, df_aliased_col_name_to_real_col_name)
932+
)
933+
elif isinstance(expr, str):
934+
if expr: # Ignore empty strings
935+
partition_sqls.append(str(expr))
936+
else:
937+
raise TypeError(
938+
f"partition_by in iceberg_config expected Column or str, got: {type(expr)}"
939+
)
940+
941+
if partition_sqls:
942+
return {**iceberg_config, "partition_by": partition_sqls}
943+
return iceberg_config
944+
907945
def resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan:
908946
self.subquery_plans = []
909947
self.generated_alias_maps = (
@@ -1164,6 +1202,10 @@ def do_resolve_with_resolved_children(
11641202

11651203
if isinstance(logical_plan, SnowflakeCreateTable):
11661204
resolved_child = resolved_children[logical_plan.children[0]]
1205+
iceberg_config = self._process_partition_by_in_iceberg_config(
1206+
logical_plan.iceberg_config, df_aliased_col_name_to_real_col_name
1207+
)
1208+
11671209
return self.plan_builder.save_as_table(
11681210
table_name=logical_plan.table_name,
11691211
column_names=logical_plan.column_names,
@@ -1184,7 +1226,7 @@ def do_resolve_with_resolved_children(
11841226
use_scoped_temp_objects=self.session._use_scoped_temp_objects,
11851227
creation_source=logical_plan.creation_source,
11861228
child_attributes=resolved_child.attributes,
1187-
iceberg_config=logical_plan.iceberg_config,
1229+
iceberg_config=iceberg_config,
11881230
table_exists=logical_plan.table_exists,
11891231
)
11901232

@@ -1416,6 +1458,10 @@ def do_resolve_with_resolved_children(
14161458
if format_name is not None:
14171459
format_type_options["FORMAT_NAME"] = format_name
14181460
assert logical_plan.file_format is not None
1461+
iceberg_config = self._process_partition_by_in_iceberg_config(
1462+
logical_plan.iceberg_config, df_aliased_col_name_to_real_col_name
1463+
)
1464+
14191465
return self.plan_builder.copy_into_table(
14201466
path=logical_plan.file_path,
14211467
table_name=logical_plan.table_name,
@@ -1435,7 +1481,7 @@ def do_resolve_with_resolved_children(
14351481
else None,
14361482
user_schema=logical_plan.user_schema,
14371483
create_table_from_infer_schema=logical_plan.create_table_from_infer_schema,
1438-
iceberg_config=logical_plan.iceberg_config,
1484+
iceberg_config=iceberg_config,
14391485
)
14401486

14411487
if isinstance(logical_plan, CopyIntoLocationNode):

src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@
151151
EXTERNAL_VOLUME = " EXTERNAL_VOLUME "
152152
CATALOG = " CATALOG "
153153
BASE_LOCATION = " BASE_LOCATION "
154+
TARGET_FILE_SIZE = " TARGET_FILE_SIZE "
154155
CATALOG_SYNC = " CATALOG_SYNC "
155156
STORAGE_SERIALIZATION_POLICY = " STORAGE_SERIALIZATION_POLICY "
156157
REG_EXP = " REGEXP "
@@ -231,23 +232,34 @@ def format_uuid(uuid: Optional[str], with_new_line: bool = True) -> str:
231232
return f"{UUID_COMMENT.format(uuid)}"
232233

233234

234-
def validate_iceberg_config(iceberg_config: Optional[dict]) -> Dict[str, str]:
235+
def validate_iceberg_config(
236+
iceberg_config: Optional[dict],
237+
) -> tuple[Dict[str, str], list]:
238+
"""
239+
Validate and process iceberg config, returning (options_dict, partition_exprs_list).
240+
"""
235241
if iceberg_config is None:
236-
return dict()
242+
return dict(), []
237243

238244
iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
239245

240-
return {
246+
# Extract partition_by (already processed as SQL strings by analyzer)
247+
partition_exprs = iceberg_config.get("partition_by", [])
248+
249+
options = {
241250
EXTERNAL_VOLUME: iceberg_config.get("external_volume", None),
242251
CATALOG: iceberg_config.get("catalog", None),
243252
BASE_LOCATION: iceberg_config.get("base_location", None),
253+
TARGET_FILE_SIZE: iceberg_config.get("target_file_size", None),
244254
CATALOG_SYNC: iceberg_config.get("catalog_sync", None),
245255
STORAGE_SERIALIZATION_POLICY: iceberg_config.get(
246256
"storage_serialization_policy", None
247257
),
248258
ICEBERG_VERSION: iceberg_config.get("iceberg_version", None),
249259
}
250260

261+
return options, partition_exprs
262+
251263

252264
def result_scan_statement(uuid_place_holder: str) -> str:
253265
return (
@@ -311,6 +323,20 @@ def partition_spec(col_exprs: List[str]) -> str:
311323
return f"PARTITION BY {COMMA.join(col_exprs)}" if col_exprs else EMPTY_STRING
312324

313325

326+
def iceberg_partition_clause(partition_exprs: List[str]) -> str:
327+
return (
328+
(
329+
SPACE
330+
+ PARTITION_BY
331+
+ LEFT_PARENTHESIS
332+
+ COMMA.join(partition_exprs)
333+
+ RIGHT_PARENTHESIS
334+
)
335+
if partition_exprs
336+
else EMPTY_STRING
337+
)
338+
339+
314340
def order_by_spec(col_exprs: List[str]) -> str:
315341
if not col_exprs:
316342
return EMPTY_STRING
@@ -1103,15 +1129,17 @@ def create_table_statement(
11031129
CHANGE_TRACKING: change_tracking,
11041130
}
11051131

1106-
iceberg_config = validate_iceberg_config(iceberg_config)
1107-
options.update(iceberg_config)
1132+
iceberg_options, partition_exprs = validate_iceberg_config(iceberg_config)
1133+
options.update(iceberg_options)
11081134
options_statement = get_options_statement(options)
11091135

1136+
partition_by_clause = iceberg_partition_clause(partition_exprs)
1137+
11101138
return (
11111139
f"{CREATE}{(OR + REPLACE) if replace else EMPTY_STRING}"
11121140
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
1113-
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
1114-
f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{cluster_by_clause}"
1141+
f"{ICEBERG if iceberg_options else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
1142+
f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{partition_by_clause}{cluster_by_clause}"
11151143
f"{options_statement}{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql}"
11161144
)
11171145

@@ -1192,15 +1220,18 @@ def create_table_as_select_statement(
11921220
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
11931221
CHANGE_TRACKING: change_tracking,
11941222
}
1195-
iceberg_config = validate_iceberg_config(iceberg_config)
1196-
options.update(iceberg_config)
1223+
iceberg_options, partition_exprs = validate_iceberg_config(iceberg_config)
1224+
options.update(iceberg_options)
11971225
options_statement = get_options_statement(options)
1226+
1227+
partition_by_clause = iceberg_partition_clause(partition_exprs)
1228+
11981229
return (
11991230
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}"
12001231
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
1201-
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}"
1232+
f"{ICEBERG if iceberg_options else EMPTY_STRING}{TABLE}"
12021233
f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
1203-
f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}"
1234+
f"{table_name}{column_definition_sql}{partition_by_clause}{cluster_by_clause}{options_statement}"
12041235
f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}"
12051236
)
12061237

@@ -1506,9 +1537,8 @@ def create_or_replace_dynamic_table_statement(
15061537
}
15071538
)
15081539

1509-
iceberg_options = get_options_statement(
1510-
validate_iceberg_config(iceberg_config)
1511-
).strip()
1540+
iceberg_options, _ = validate_iceberg_config(iceberg_config)
1541+
iceberg_options = get_options_statement(iceberg_options).strip()
15121542

15131543
return (
15141544
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}{TRANSIENT if is_transient else EMPTY_STRING}"

src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,6 +1298,8 @@ def save_as_table(
12981298
the Iceberg table stores its metadata files and data in Parquet format
12991299
catalog: specifies either Snowflake or a catalog integration to use for this table
13001300
base_location: the base directory that snowflake can write iceberg metadata and files to
1301+
target_file_size: specifies a target Parquet file size for the table.
1302+
Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB'
13011303
catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
13021304
storage_serialization_policy: specifies the storage serialization policy for the table
13031305
iceberg_version: Overrides the version of iceberg to use. Defaults to 2 when unset.

src/snowflake/snowpark/_internal/compiler/plan_compiler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,10 @@ def replace_temp_obj_placeholders(
216216
placeholder_name,
217217
temp_obj_type,
218218
) = query.temp_obj_name_placeholder
219-
placeholders[placeholder_name] = random_name_for_temp_object(
220-
temp_obj_type
221-
)
222-
219+
if placeholder_name not in placeholders:
220+
placeholders[placeholder_name] = random_name_for_temp_object(
221+
temp_obj_type
222+
)
223223
copied_query = copy.copy(query)
224224
for placeholder_name, target_temp_name in placeholders.items():
225225
# Copy the original query and replace all the placeholder names with the

src/snowflake/snowpark/_internal/event_table_telemetry.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from logging import getLogger
88
from typing import Dict, Optional, Tuple
99
from snowflake.connector.options import MissingOptionalDependency, ModuleLikeObject
10-
from snowflake.connector.wif_util import create_attestation
1110

1211
import snowflake.snowpark
1312
import requests
@@ -320,6 +319,8 @@ def disable_event_table_telemetry_collection(self) -> None:
320319
self._disable_logger_provider()
321320

322321
def _get_external_telemetry_auth_token(self) -> Dict:
322+
from snowflake.connector.wif_util import create_attestation
323+
323324
self._attestation = create_attestation(
324325
self.session.connection.auth_class.provider,
325326
self.session.connection.auth_class.entra_resource,

src/snowflake/snowpark/_internal/xml_reader.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,6 @@ def find_next_opening_tag_pos(
205205
chunk = file_obj.read(current_chunk_size)
206206
if not chunk:
207207
raise EOFError("Reached end of file before finding opening tag")
208-
# If the chunk is smaller than expected, we are near the end.
209-
if len(chunk) < current_chunk_size:
210-
if chunk.find(tag_start_1) == -1 and chunk.find(tag_start_2) == -1:
211-
raise EOFError("Reached end of file before finding opening tag")
212208

213209
# Combine leftover from previous read with the new chunk.
214210
data = overlap + chunk
@@ -233,9 +229,6 @@ def find_next_opening_tag_pos(
233229
# Update the overlap from the end of the combined data.
234230
overlap = data[-overlap_size:] if len(data) >= overlap_size else data
235231

236-
# Otherwise, rewind by the length of the overlap so that a tag spanning the boundary isn't missed.
237-
file_obj.seek(-len(overlap), 1)
238-
239232
# Check that progress is being made to avoid infinite loops.
240233
if file_obj.tell() <= pos_before:
241234
raise EOFError("No progress made while searching for opening tag")

0 commit comments

Comments
 (0)