Skip to content

Commit 2243ab8

Browse files
Fix AssertionError crash when INFER_SCHEMA fails for JSON/ORC/AVRO files (#4129)
1 parent 8216ff1 commit 2243ab8

File tree

3 files changed

+114
-13
lines changed

3 files changed

+114
-13
lines changed

src/snowflake/snowpark/dataframe_reader.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1624,14 +1624,25 @@ def _read_semi_structured_file(self, path: str, format: str) -> DataFrame:
16241624
use_user_schema = True
16251625

16261626
elif self._infer_schema:
1627-
(
1628-
new_schema,
1629-
schema_to_cast,
1630-
read_file_transformations,
1631-
_, # we don't check for error in case of infer schema failures. We use $1, Variant type
1632-
) = self._infer_schema_for_file_format(path, format)
1633-
if new_schema:
1634-
schema = new_schema
1627+
if not isinstance(self._session._conn, MockServerConnection):
1628+
(
1629+
new_schema,
1630+
schema_to_cast,
1631+
read_file_transformations,
1632+
infer_schema_exception,
1633+
) = self._infer_schema_for_file_format(path, format)
1634+
if new_schema:
1635+
schema = new_schema
1636+
elif infer_schema_exception is not None:
1637+
if isinstance(infer_schema_exception, FileNotFoundError):
1638+
raise infer_schema_exception
1639+
logger.warning(
1640+
f"Could not infer schema for {format} file due to exception: "
1641+
f"{infer_schema_exception}. "
1642+
"\nFalling back to $1 VARIANT schema. "
1643+
"Please use DataFrameReader.schema() to specify a user schema for the file."
1644+
)
1645+
self._cur_options["INFER_SCHEMA"] = False
16351646

16361647
metadata_project, metadata_schema = self._get_metadata_project_and_schema()
16371648

tests/integ/modin/io/test_read_json.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import pytest
1717

1818
import snowflake.snowpark.modin.plugin # noqa: F401
19+
from snowflake.snowpark.exceptions import SnowparkSQLException
1920
from tests.integ.modin.utils import assert_frame_equal
2021
from tests.integ.utils.sql_counter import SqlCounter, sql_count_checker
2122
from tests.utils import Utils
@@ -294,9 +295,6 @@ def test_read_json_staged_folder():
294295

295296

296297
@sql_count_checker(query_count=4)
297-
@pytest.mark.xfail(
298-
reason="SNOW-1336174: Remove xfail by handling empty JSON files", strict=True
299-
)
300298
def test_read_json_empty_file():
301299
with open("test_read_json_empty_file.json", "w"):
302300
pass
@@ -307,13 +305,13 @@ def test_read_json_empty_file():
307305
os.remove("test_read_json_empty_file.json")
308306

309307

310-
@sql_count_checker(query_count=3)
308+
@sql_count_checker(query_count=5)
311309
def test_read_json_malformed_file_negative():
312310

313311
with open("test_read_json_malformed_file.json", "w") as f:
314312
f.write("{a: 3, key_no_value}")
315313

316-
with pytest.raises(AssertionError):
314+
with pytest.raises(SnowparkSQLException):
317315
pd.read_json("test_read_json_malformed_file.json")
318316

319317
os.remove("test_read_json_malformed_file.json")

tests/unit/test_dataframe.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#
33
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
44
#
5+
import logging
56
import re
67
from unittest import mock
78

@@ -143,6 +144,97 @@ def nop(name):
143144
)
144145

145146

147+
def _create_fake_session(sql_simplifier_enabled=True):
148+
"""Build a minimal fake session suitable for DataFrameReader unit tests."""
149+
150+
def nop(name):
151+
return name
152+
153+
fake_session = mock.create_autospec(snowflake.snowpark.session.Session)
154+
fake_session.sql_simplifier_enabled = sql_simplifier_enabled
155+
fake_session._cte_optimization_enabled = False
156+
fake_session._query_compilation_stage_enabled = False
157+
fake_session._join_alias_fix = False
158+
fake_session._conn = mock.create_autospec(ServerConnection)
159+
fake_session._conn._thread_safe_session_enabled = True
160+
fake_session._plan_builder = SnowflakePlanBuilder(fake_session)
161+
fake_session._analyzer = Analyzer(fake_session)
162+
fake_session._use_scoped_temp_objects = True
163+
fake_session._ast_batch = mock.create_autospec(AstBatch)
164+
fake_session.get_fully_qualified_name_if_possible = nop
165+
return fake_session
166+
167+
168+
@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"])
169+
def test_read_semi_structured_infer_schema_generic_error(format_type, caplog):
170+
"""When _infer_schema_for_file_format returns a non-FileNotFoundError,
171+
the reader should log a warning, set INFER_SCHEMA=False, and still
172+
return a DataFrame with the $1 VARIANT fallback schema."""
173+
error = RuntimeError("Cannot infer schema: error 100069")
174+
175+
def mock_infer(*args, **kwargs):
176+
return None, None, None, error
177+
178+
fake_session = _create_fake_session()
179+
reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True)
180+
181+
with mock.patch(
182+
"snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format",
183+
mock_infer,
184+
):
185+
with caplog.at_level(logging.WARNING):
186+
df = getattr(reader, format_type)("@stage/file")
187+
188+
assert df is not None
189+
assert f"Could not infer schema for {format_type.upper()} file" in caplog.text
190+
assert "100069" in caplog.text
191+
assert "Falling back to $1 VARIANT schema" in caplog.text
192+
193+
194+
@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"])
195+
def test_read_semi_structured_infer_schema_file_not_found(format_type):
196+
"""When _infer_schema_for_file_format returns a FileNotFoundError,
197+
the reader should re-raise it directly."""
198+
error = FileNotFoundError("Stage path does not exist or not authorized")
199+
200+
def mock_infer(*args, **kwargs):
201+
return None, None, None, error
202+
203+
fake_session = _create_fake_session()
204+
reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True)
205+
206+
with mock.patch(
207+
"snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format",
208+
mock_infer,
209+
):
210+
with pytest.raises(FileNotFoundError, match="not authorized"):
211+
getattr(reader, format_type)("@stage/file")
212+
213+
214+
@pytest.mark.parametrize("format_type", ["json", "avro", "orc", "parquet"])
215+
def test_read_semi_structured_infer_schema_success_no_warning(format_type, caplog):
216+
"""When _infer_schema_for_file_format succeeds, no warning should be logged
217+
and INFER_SCHEMA should remain True."""
218+
schema = [Attribute('"col1"', StringType())]
219+
schema_to_cast = [("$1:col1::VARCHAR", "col1")]
220+
221+
def mock_infer(*args, **kwargs):
222+
return schema, schema_to_cast, None, None
223+
224+
fake_session = _create_fake_session()
225+
reader = DataFrameReader(fake_session).option("INFER_SCHEMA", True)
226+
227+
with mock.patch(
228+
"snowflake.snowpark.dataframe_reader.DataFrameReader._infer_schema_for_file_format",
229+
mock_infer,
230+
):
231+
with caplog.at_level(logging.WARNING):
232+
df = getattr(reader, format_type)("@stage/file")
233+
234+
assert df is not None
235+
assert "Could not infer schema" not in caplog.text
236+
237+
146238
def test_select_negative():
147239
AST_ENABLED = False
148240
set_ast_state(AstFlagSource.TEST, AST_ENABLED)

0 commit comments

Comments
 (0)