Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions src/snowflake/snowpark/dataframe_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,14 +1624,25 @@ def _read_semi_structured_file(self, path: str, format: str) -> DataFrame:
use_user_schema = True

elif self._infer_schema:
(
new_schema,
schema_to_cast,
read_file_transformations,
_, # we don't check for error in case of infer schema failures. We use $1, Variant type
) = self._infer_schema_for_file_format(path, format)
if new_schema:
schema = new_schema
if not isinstance(self._session._conn, MockServerConnection):
(
new_schema,
schema_to_cast,
read_file_transformations,
infer_schema_exception,
) = self._infer_schema_for_file_format(path, format)
if new_schema:
schema = new_schema
elif infer_schema_exception is not None:
if isinstance(infer_schema_exception, FileNotFoundError):
raise infer_schema_exception
logger.warning(
f"Could not infer schema for {format} file due to exception: "
f"{infer_schema_exception}. "
"\nFalling back to $1 VARIANT schema. "
"Please use DataFrameReader.schema() to specify a user schema for the file."
)
self._cur_options["INFER_SCHEMA"] = False

metadata_project, metadata_schema = self._get_metadata_project_and_schema()

Expand Down
8 changes: 3 additions & 5 deletions tests/integ/modin/io/test_read_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pytest

import snowflake.snowpark.modin.plugin # noqa: F401
from snowflake.snowpark.exceptions import SnowparkSQLException
from tests.integ.modin.utils import assert_frame_equal
from tests.integ.utils.sql_counter import SqlCounter, sql_count_checker
from tests.utils import Utils
Expand Down Expand Up @@ -294,9 +295,6 @@ def test_read_json_staged_folder():


@sql_count_checker(query_count=4)
@pytest.mark.xfail(
reason="SNOW-1336174: Remove xfail by handling empty JSON files", strict=True
)
def test_read_json_empty_file():
with open("test_read_json_empty_file.json", "w"):
pass
Expand All @@ -307,13 +305,13 @@ def test_read_json_empty_file():
os.remove("test_read_json_empty_file.json")


@sql_count_checker(query_count=3)
@sql_count_checker(query_count=4)
def test_read_json_malformed_file_negative():

with open("test_read_json_malformed_file.json", "w") as f:
f.write("{a: 3, key_no_value}")

with pytest.raises(AssertionError):
with pytest.raises(SnowparkSQLException):
pd.read_json("test_read_json_malformed_file.json")

os.remove("test_read_json_malformed_file.json")
Expand Down
92 changes: 92 additions & 0 deletions tests/unit/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#
import logging
import re
from unittest import mock

Expand Down Expand Up @@ -143,6 +144,97 @@ def nop(name):
)


def _create_fake_session(sql_simplifier_enabled=True):
"""Build a minimal fake session suitable for DataFrameReader unit tests."""

def nop(name):
return name

fake_session = mock.create_autospec(snowflake.snowpark.session.Session)
fake_session.sql_simplifier_enabled = sql_simplifier_enabled
fake_session._cte_optimization_enabled = False
fake_session._query_compilation_stage_enabled = False
fake_session._join_alias_fix = False
fake_session._conn = mock.create_autospec(ServerConnection)
fake_session._conn._thread_safe_session_enabled = True
fake_session._plan_builder = SnowflakePlanBuilder(fake_session)
fake_session._analyzer = Analyzer(fake_session)
fake_session._use_scoped_temp_objects = True
fake_session._ast_batch = mock.create_autospec(AstBatch)
fake_session.get_fully_qualified_name_if_possible = nop
return fake_session


@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"])
def test_read_semi_structured_infer_schema_generic_error(format_type, caplog):
"""When _infer_schema_for_file_format returns a non-FileNotFoundError,
the reader should log a warning, set INFER_SCHEMA=False, and still
return a DataFrame with the $1 VARIANT fallback schema."""
error = RuntimeError("Cannot infer schema: error 100069")

def mock_infer(*args, **kwargs):
return None, None, None, error

fake_session = _create_fake_session()
reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True)

with mock.patch(
"snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format",
mock_infer,
):
with caplog.at_level(logging.WARNING):
df = getattr(reader, format_type)("@stage/file")

assert df is not None
assert f"Could not infer schema for {format_type.upper()} file" in caplog.text
assert "100069" in caplog.text
assert "Falling back to $1 VARIANT schema" in caplog.text


@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"])
def test_read_semi_structured_infer_schema_file_not_found(format_type):
"""When _infer_schema_for_file_format returns a FileNotFoundError,
the reader should re-raise it directly."""
error = FileNotFoundError("Stage path does not exist or not authorized")

def mock_infer(*args, **kwargs):
return None, None, None, error

fake_session = _create_fake_session()
reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True)

with mock.patch(
"snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format",
mock_infer,
):
with pytest.raises(FileNotFoundError, match="not authorized"):
getattr(reader, format_type)("@stage/file")


@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"])
def test_read_semi_structured_infer_schema_success_no_warning(format_type, caplog):
"""When _infer_schema_for_file_format succeeds, no warning should be logged
and INFER_SCHEMA should remain True."""
schema = [Attribute('"col1"', StringType())]
schema_to_cast = [("$1:col1::VARCHAR", "col1")]

def mock_infer(*args, **kwargs):
return schema, schema_to_cast, None, None

fake_session = _create_fake_session()
reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True)

with mock.patch(
"snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format",
mock_infer,
):
with caplog.at_level(logging.WARNING):
df = getattr(reader, format_type)("@stage/file")

assert df is not None
assert "Could not infer schema" not in caplog.text


def test_select_negative():
AST_ENABLED = False
set_ast_state(AstFlagSource.TEST, AST_ENABLED)
Expand Down
Loading