|
2 | 2 | # Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. |
3 | 3 | # |
4 | 4 |
|
| 5 | +import datetime |
| 6 | +import os |
| 7 | +import tempfile |
| 8 | + |
| 9 | +import pyarrow as pa |
| 10 | +import pyarrow.parquet as pq |
5 | 11 | import pytest |
| 12 | + |
| 13 | +from snowflake.snowpark._internal.utils import TempObjectType |
6 | 14 | from snowflake.snowpark.functions import col, fl_get_file_type |
7 | | -from snowflake.snowpark.types import FileType |
| 15 | +from snowflake.snowpark.types import FileType, TimestampType |
8 | 16 | from tests.utils import Utils, TestFiles |
9 | 17 |
|
10 | 18 |
|
@@ -356,3 +364,90 @@ def test_file_pattern_escape_single_quotes(session, resources_path): |
356 | 364 |
|
357 | 365 | finally: |
358 | 366 | Utils.drop_stage(session, test_stage) |
| 367 | + |
| 368 | + |
| 369 | +@pytest.mark.skipif( |
| 370 | + "config.getoption('local_testing_mode', default=False)", |
| 371 | + reason="FEAT: parquet not supported", |
| 372 | +) |
| 373 | +def test_parquet_pattern_infer_with_metadata_files(session): |
| 374 | + """When a stage contains both .parquet data files and _common_metadata |
| 375 | + files with a mismatched schema, reading with PATTERN should correctly |
| 376 | + infer timestamp columns instead of falling back to VARIANT.""" |
| 377 | + stage_name = Utils.random_name_for_temp_object(TempObjectType.STAGE) |
| 378 | + |
| 379 | + ts1 = datetime.datetime(2024, 1, 15, 10, 30, 0, 123456) |
| 380 | + ts2 = datetime.datetime(2024, 6, 20, 14, 45, 30, 789012) |
| 381 | + |
| 382 | + arrow_schema = pa.schema( |
| 383 | + [ |
| 384 | + pa.field("id", pa.int64()), |
| 385 | + pa.field("name", pa.string()), |
| 386 | + pa.field("updated_time", pa.timestamp("us", tz="UTC")), |
| 387 | + ] |
| 388 | + ) |
| 389 | + |
| 390 | + table = pa.table( |
| 391 | + { |
| 392 | + "id": [1, 2], |
| 393 | + "name": ["Alice", "Bob"], |
| 394 | + "updated_time": [ts1, ts2], |
| 395 | + }, |
| 396 | + schema=arrow_schema, |
| 397 | + ) |
| 398 | + |
| 399 | + bad_arrow_schema = pa.schema( |
| 400 | + [ |
| 401 | + pa.field("id", pa.int64()), |
| 402 | + pa.field("name", pa.string()), |
| 403 | + pa.field("updated_time", pa.timestamp("us")), |
| 404 | + ] |
| 405 | + ) |
| 406 | + |
| 407 | + try: |
| 408 | + Utils.create_stage(session, stage_name, is_temporary=True) |
| 409 | + |
| 410 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 411 | + data_path = os.path.join(tmpdir, "data.parquet") |
| 412 | + pq.write_table(table, data_path) |
| 413 | + |
| 414 | + meta_path = os.path.join(tmpdir, "_common_metadata") |
| 415 | + empty_table = pa.table( |
| 416 | + { |
| 417 | + name: pa.array([], type=bad_arrow_schema.field(name).type) |
| 418 | + for name in bad_arrow_schema.names |
| 419 | + }, |
| 420 | + schema=bad_arrow_schema, |
| 421 | + ) |
| 422 | + pq.write_table(empty_table, meta_path) |
| 423 | + |
| 424 | + session.file.put( |
| 425 | + data_path, |
| 426 | + f"@{stage_name}/subdir", |
| 427 | + auto_compress=False, |
| 428 | + overwrite=True, |
| 429 | + ) |
| 430 | + session.file.put( |
| 431 | + meta_path, |
| 432 | + f"@{stage_name}/subdir", |
| 433 | + auto_compress=False, |
| 434 | + overwrite=True, |
| 435 | + ) |
| 436 | + |
| 437 | + df = session.read.option("PATTERN", ".*[.]parquet").parquet( |
| 438 | + f"@{stage_name}/subdir" |
| 439 | + ) |
| 440 | + |
| 441 | + schema = df.schema |
| 442 | + ts_field = [ |
| 443 | + f for f in schema.fields if f.name.strip('"').upper() == "UPDATED_TIME" |
| 444 | + ][0] |
| 445 | + assert isinstance( |
| 446 | + ts_field.datatype, TimestampType |
| 447 | + ), f"Expected TimestampType, got {ts_field.datatype}" |
| 448 | + |
| 449 | + rows = df.collect() |
| 450 | + assert len(rows) == 2 |
| 451 | + |
| 452 | + finally: |
| 453 | + Utils.drop_stage(session, stage_name) |
0 commit comments