Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/snowflake/snowpark/dataframe_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1628,10 +1628,20 @@ def _read_semi_structured_file(self, path: str, format: str) -> DataFrame:
new_schema,
schema_to_cast,
read_file_transformations,
_, # we don't check for error in case of infer schema failures. We use $1, Variant type
infer_schema_exception,
) = self._infer_schema_for_file_format(path, format)
if new_schema:
schema = new_schema
elif infer_schema_exception is not None:
if isinstance(infer_schema_exception, FileNotFoundError):
raise infer_schema_exception
logger.warning(
f"Could not infer schema for {format} file due to exception: "
f"{infer_schema_exception}. "
"\nFalling back to $1 VARIANT schema. "
"Please use DataFrameReader.schema() to specify a user schema for the file."
)
self._cur_options["INFER_SCHEMA"] = False

metadata_project, metadata_schema = self._get_metadata_project_and_schema()

Expand Down
92 changes: 92 additions & 0 deletions tests/unit/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#
import logging
import re
from unittest import mock

Expand Down Expand Up @@ -143,6 +144,97 @@ def nop(name):
)


def _create_fake_session(sql_simplifier_enabled=True):
"""Build a minimal fake session suitable for DataFrameReader unit tests."""

def nop(name):
return name

fake_session = mock.create_autospec(snowflake.snowpark.session.Session)
fake_session.sql_simplifier_enabled = sql_simplifier_enabled
fake_session._cte_optimization_enabled = False
fake_session._query_compilation_stage_enabled = False
fake_session._join_alias_fix = False
fake_session._conn = mock.create_autospec(ServerConnection)
fake_session._conn._thread_safe_session_enabled = True
fake_session._plan_builder = SnowflakePlanBuilder(fake_session)
fake_session._analyzer = Analyzer(fake_session)
fake_session._use_scoped_temp_objects = True
fake_session._ast_batch = mock.create_autospec(AstBatch)
fake_session.get_fully_qualified_name_if_possible = nop
return fake_session


@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"])
def test_read_semi_structured_infer_schema_generic_error(format_type, caplog):
"""When _infer_schema_for_file_format returns a non-FileNotFoundError,
the reader should log a warning, set INFER_SCHEMA=False, and still
return a DataFrame with the $1 VARIANT fallback schema."""
error = RuntimeError("Cannot infer schema: error 100069")

def mock_infer(*args, **kwargs):
return None, None, None, error

fake_session = _create_fake_session()
reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True)

with mock.patch(
"snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format",
mock_infer,
):
with caplog.at_level(logging.WARNING):
df = getattr(reader, format_type)("@stage/file")

assert df is not None
assert f"Could not infer schema for {format_type.upper()} file" in caplog.text
assert "100069" in caplog.text
assert "Falling back to $1 VARIANT schema" in caplog.text


@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"])
def test_read_semi_structured_infer_schema_file_not_found(format_type):
"""When _infer_schema_for_file_format returns a FileNotFoundError,
the reader should re-raise it directly."""
error = FileNotFoundError("Stage path does not exist or not authorized")

def mock_infer(*args, **kwargs):
return None, None, None, error

fake_session = _create_fake_session()
reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True)

with mock.patch(
"snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format",
mock_infer,
):
with pytest.raises(FileNotFoundError, match="not authorized"):
getattr(reader, format_type)("@stage/file")


@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"])
def test_read_semi_structured_infer_schema_success_no_warning(format_type, caplog):
"""When _infer_schema_for_file_format succeeds, no warning should be logged
and INFER_SCHEMA should remain True."""
schema = [Attribute('"col1"', StringType())]
schema_to_cast = [("$1:col1::VARCHAR", "col1")]

def mock_infer(*args, **kwargs):
return schema, schema_to_cast, None, None

fake_session = _create_fake_session()
reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True)

with mock.patch(
"snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format",
mock_infer,
):
with caplog.at_level(logging.WARNING):
df = getattr(reader, format_type)("@stage/file")

assert df is not None
assert "Could not infer schema" not in caplog.text


def test_select_negative():
AST_ENABLED = False
set_ast_state(AstFlagSource.TEST, AST_ENABLED)
Expand Down
Loading