diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index 2742331cd0..949b6c6624 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -1624,14 +1624,25 @@ def _read_semi_structured_file(self, path: str, format: str) -> DataFrame: use_user_schema = True elif self._infer_schema: - ( - new_schema, - schema_to_cast, - read_file_transformations, - _, # we don't check for error in case of infer schema failures. We use $1, Variant type - ) = self._infer_schema_for_file_format(path, format) - if new_schema: - schema = new_schema + if not isinstance(self._session._conn, MockServerConnection): + ( + new_schema, + schema_to_cast, + read_file_transformations, + infer_schema_exception, + ) = self._infer_schema_for_file_format(path, format) + if new_schema: + schema = new_schema + elif infer_schema_exception is not None: + if isinstance(infer_schema_exception, FileNotFoundError): + raise infer_schema_exception + logger.warning( + f"Could not infer schema for {format} file due to exception: " + f"{infer_schema_exception}. " + "\nFalling back to $1 VARIANT schema. " + "Please use DataFrameReader.schema() to specify a user schema for the file." + ) + self._cur_options["INFER_SCHEMA"] = False metadata_project, metadata_schema = self._get_metadata_project_and_schema() diff --git a/tests/integ/modin/io/test_read_json.py b/tests/integ/modin/io/test_read_json.py index bc3879d961..0bb995ef84 100644 --- a/tests/integ/modin/io/test_read_json.py +++ b/tests/integ/modin/io/test_read_json.py @@ -16,6 +16,7 @@ import pytest import snowflake.snowpark.modin.plugin # noqa: F401 +from snowflake.snowpark.exceptions import SnowparkSQLException from tests.integ.modin.utils import assert_frame_equal from tests.integ.utils.sql_counter import SqlCounter, sql_count_checker from tests.utils import Utils @@ -294,9 +295,6 @@ def test_read_json_staged_folder(): @sql_count_checker(query_count=4) -@pytest.mark.xfail( - reason="SNOW-1336174: Remove xfail by handling empty JSON files", strict=True -) def test_read_json_empty_file(): with open("test_read_json_empty_file.json", "w"): pass @@ -307,13 +305,13 @@ def test_read_json_empty_file(): os.remove("test_read_json_empty_file.json") -@sql_count_checker(query_count=3) +@sql_count_checker(query_count=5) def test_read_json_malformed_file_negative(): with open("test_read_json_malformed_file.json", "w") as f: f.write("{a: 3, key_no_value}") - with pytest.raises(AssertionError): + with pytest.raises(SnowparkSQLException): pd.read_json("test_read_json_malformed_file.json") os.remove("test_read_json_malformed_file.json") diff --git a/tests/unit/test_dataframe.py b/tests/unit/test_dataframe.py index e4e5d95d24..4d3e94f5f2 100644 --- a/tests/unit/test_dataframe.py +++ b/tests/unit/test_dataframe.py @@ -2,6 +2,7 @@ # # Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. # +import logging import re from unittest import mock @@ -143,6 +144,97 @@ def nop(name): ) +def _create_fake_session(sql_simplifier_enabled=True): + """Build a minimal fake session suitable for DataFrameReader unit tests.""" + + def nop(name): + return name + + fake_session = mock.create_autospec(snowflake.snowpark.session.Session) + fake_session.sql_simplifier_enabled = sql_simplifier_enabled + fake_session._cte_optimization_enabled = False + fake_session._query_compilation_stage_enabled = False + fake_session._join_alias_fix = False + fake_session._conn = mock.create_autospec(ServerConnection) + fake_session._conn._thread_safe_session_enabled = True + fake_session._plan_builder = SnowflakePlanBuilder(fake_session) + fake_session._analyzer = Analyzer(fake_session) + fake_session._use_scoped_temp_objects = True + fake_session._ast_batch = mock.create_autospec(AstBatch) + fake_session.get_fully_qualified_name_if_possible = nop + return fake_session + + +@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"]) +def test_read_semi_structured_infer_schema_generic_error(format_type, caplog): + """When _infer_schema_for_file_format returns a non-FileNotFoundError, + the reader should log a warning, set INFER_SCHEMA=False, and still + return a DataFrame with the $1 VARIANT fallback schema.""" + error = RuntimeError("Cannot infer schema: error 100069") + + def mock_infer(*args, **kwargs): + return None, None, None, error + + fake_session = _create_fake_session() + reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True) + + with mock.patch( + "snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format", + mock_infer, + ): + with caplog.at_level(logging.WARNING): + df = getattr(reader, format_type)("@stage/file") + + assert df is not None + assert f"Could not infer schema for {format_type.upper()} file" in caplog.text + assert "100069" in caplog.text + assert "Falling back to $1 VARIANT schema" in caplog.text + + +@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"]) +def test_read_semi_structured_infer_schema_file_not_found(format_type): + """When _infer_schema_for_file_format returns a FileNotFoundError, + the reader should re-raise it directly.""" + error = FileNotFoundError("Stage path does not exist or not authorized") + + def mock_infer(*args, **kwargs): + return None, None, None, error + + fake_session = _create_fake_session() + reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True) + + with mock.patch( + "snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format", + mock_infer, + ): + with pytest.raises(FileNotFoundError, match="not authorized"): + getattr(reader, format_type)("@stage/file") + + +@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"]) +def test_read_semi_structured_infer_schema_success_no_warning(format_type, caplog): + """When _infer_schema_for_file_format succeeds, no warning should be logged + and INFER_SCHEMA should remain True.""" + schema = [Attribute('"col1"', StringType())] + schema_to_cast = [("$1:col1::VARCHAR", "col1")] + + def mock_infer(*args, **kwargs): + return schema, schema_to_cast, None, None + + fake_session = _create_fake_session() + reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True) + + with mock.patch( + "snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format", + mock_infer, + ): + with caplog.at_level(logging.WARNING): + df = getattr(reader, format_type)("@stage/file") + + assert df is not None + assert "Could not infer schema" not in caplog.text + + def test_select_negative(): AST_ENABLED = False set_ast_state(AstFlagSource.TEST, AST_ENABLED)