diff --git a/airbyte_cdk/sources/file_based/exceptions.py b/airbyte_cdk/sources/file_based/exceptions.py index c75f3257f..cd727463a 100644 --- a/airbyte_cdk/sources/file_based/exceptions.py +++ b/airbyte_cdk/sources/file_based/exceptions.py @@ -92,6 +92,12 @@ class RecordParseError(BaseFileBasedSourceError): pass +class ExcelCalamineParsingError(BaseFileBasedSourceError): + """Raised when Calamine engine fails to parse an Excel file.""" + + pass + + class SchemaInferenceError(BaseFileBasedSourceError): pass diff --git a/airbyte_cdk/sources/file_based/file_types/excel_parser.py b/airbyte_cdk/sources/file_based/file_types/excel_parser.py index f99ca0180..93896f14f 100644 --- a/airbyte_cdk/sources/file_based/file_types/excel_parser.py +++ b/airbyte_cdk/sources/file_based/file_types/excel_parser.py @@ -3,6 +3,7 @@ # import logging +import warnings from io import IOBase from pathlib import Path from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union @@ -17,6 +18,7 @@ ) from airbyte_cdk.sources.file_based.exceptions import ( ConfigValidationError, + ExcelCalamineParsingError, FileBasedSourceError, RecordParseError, ) @@ -64,7 +66,7 @@ async def infer_schema( fields: Dict[str, str] = {} with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: - df = self.open_and_parse_file(fp) + df = self.open_and_parse_file(fp, logger, file) for column, df_type in df.dtypes.items(): # Choose the broadest data type if the column's data type differs in dataframes prev_frame_column_type = fields.get(column) # type: ignore [call-overload] @@ -92,7 +94,7 @@ def parse_records( discovered_schema: Optional[Mapping[str, SchemaType]] = None, ) -> Iterable[Dict[str, Any]]: """ - Parses records from an Excel file based on the provided configuration. + Parses records from an Excel file with fallback error handling. Args: config (FileBasedStreamConfig): Configuration for the file-based stream. @@ -111,7 +113,7 @@ def parse_records( try: # Open and parse the file using the stream reader with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: - df = self.open_and_parse_file(fp) + df = self.open_and_parse_file(fp, logger, file) # Yield records as dictionaries # DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson # DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior @@ -180,15 +182,93 @@ def validate_format(excel_format: BaseModel, logger: logging.Logger) -> None: logger.info(f"Expected ExcelFormat, got {excel_format}") raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) - @staticmethod - def open_and_parse_file(fp: Union[IOBase, str, Path]) -> pd.DataFrame: + def _open_and_parse_file_with_calamine( + self, + fp: Union[IOBase, str, Path], + logger: logging.Logger, + file: RemoteFile, + ) -> pd.DataFrame: + """Opens and parses Excel file using Calamine engine. + + Args: + fp: File pointer to the Excel file. + logger: Logger for logging information and errors. + file: Remote file information for logging context. + + Returns: + pd.DataFrame: Parsed data from the Excel file. + + Raises: + ExcelCalamineParsingError: If Calamine fails to parse the file. + """ + try: + return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return] + except BaseException as exc: + # Calamine engine raises PanicException(child of BaseException) if Calamine fails to parse the file. + # Checking if ValueError in exception arg to know if it was actually an error during parsing due to invalid values in cells. + # Otherwise, raise an exception. + if "ValueError" in str(exc): + logger.warning( + f"Calamine parsing failed for {file.file_uri_for_logging}, falling back to openpyxl: {exc}" + ) + raise ExcelCalamineParsingError( + f"Calamine engine failed to parse {file.file_uri_for_logging}", + filename=file.uri, + ) from exc + raise exc + + def _open_and_parse_file_with_openpyxl( + self, + fp: Union[IOBase, str, Path], + logger: logging.Logger, + file: RemoteFile, + ) -> pd.DataFrame: + """Opens and parses Excel file using Openpyxl engine. + + Args: + fp: File pointer to the Excel file. + logger: Logger for logging information and errors. + file: Remote file information for logging context. + + Returns: + pd.DataFrame: Parsed data from the Excel file. """ - Opens and parses the Excel file. + # Some file-like objects are not seekable. + if hasattr(fp, "seek"): + try: + fp.seek(0) # type: ignore [union-attr] + except OSError as exc: + logger.info( + f"Could not rewind stream for {file.file_uri_for_logging}; " + f"proceeding with openpyxl from current position: {exc}" + ) + + with warnings.catch_warnings(record=True) as warning_records: + warnings.simplefilter("always") + df = pd.ExcelFile(fp, engine="openpyxl").parse() # type: ignore [arg-type, call-overload] + + for warning in warning_records: + logger.warning(f"Openpyxl warning for {file.file_uri_for_logging}: {warning.message}") + + return df # type: ignore [no-any-return] + + def open_and_parse_file( + self, + fp: Union[IOBase, str, Path], + logger: logging.Logger, + file: RemoteFile, + ) -> pd.DataFrame: + """Opens and parses the Excel file with Calamine-first and Openpyxl fallback. Args: fp: File pointer to the Excel file. + logger: Logger for logging information and errors. + file: Remote file information for logging context. Returns: pd.DataFrame: Parsed data from the Excel file. """ - return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return] + try: + return self._open_and_parse_file_with_calamine(fp, logger, file) + except ExcelCalamineParsingError: + return self._open_and_parse_file_with_openpyxl(fp, logger, file) diff --git a/airbyte_cdk/sources/file_based/remote_file.py b/airbyte_cdk/sources/file_based/remote_file.py index 00669fbba..8d30a5333 100644 --- a/airbyte_cdk/sources/file_based/remote_file.py +++ b/airbyte_cdk/sources/file_based/remote_file.py @@ -17,6 +17,11 @@ class RemoteFile(BaseModel): last_modified: datetime mime_type: Optional[str] = None + @property + def file_uri_for_logging(self) -> str: + """Returns a user-friendly identifier for logging.""" + return self.uri + class UploadableRemoteFile(RemoteFile, ABC): """ @@ -49,13 +54,6 @@ def source_file_relative_path(self) -> str: """ return self.uri - @property - def file_uri_for_logging(self) -> str: - """ - Returns the URI for the file being logged. - """ - return self.uri - @property def source_uri(self) -> str: """ diff --git a/poetry.lock b/poetry.lock index f746538b0..d316437a1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1277,7 +1277,7 @@ description = "An implementation of lxml.xmlfile for the standard library" optional = true python-versions = ">=3.8" groups = ["main"] -markers = "(python_version <= \"3.11\" or python_version >= \"3.12.0\") and extra == \"vector-db-based\"" +markers = "(extra == \"vector-db-based\" or extra == \"file-based\") and (python_version <= \"3.11\" or python_version >= \"3.12.0\")" files = [ {file = "et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa"}, {file = "et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54"}, @@ -3460,7 +3460,7 @@ description = "A Python library to read/write Excel 2010 xlsx/xlsm files" optional = true python-versions = ">=3.8" groups = ["main"] -markers = "(python_version <= \"3.11\" or python_version >= \"3.12.0\") and extra == \"vector-db-based\"" +markers = "(extra == \"vector-db-based\" or extra == \"file-based\") and (python_version <= \"3.11\" or python_version >= \"3.12.0\")" files = [ {file = "openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2"}, {file = "openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050"}, @@ -7003,7 +7003,7 @@ cffi = ["cffi (>=1.17,<2.0)", "cffi (>=2.0.0b)"] [extras] dev = ["pytest"] -file-based = ["avro", "fastavro", "markdown", "pdf2image", "pdfminer.six", "pyarrow", "pytesseract", "python-calamine", "python-snappy", "unstructured", "unstructured.pytesseract"] +file-based = ["avro", "fastavro", "markdown", "openpyxl", "pdf2image", "pdfminer.six", "pyarrow", "pytesseract", "python-calamine", "python-snappy", "unstructured", "unstructured.pytesseract"] manifest-server = ["ddtrace", "fastapi", "uvicorn"] sql = ["sqlalchemy"] vector-db-based = ["cohere", "langchain_community", "langchain_core", "langchain_text_splitters", "openai", "tiktoken"] @@ -7011,4 +7011,4 @@ vector-db-based = ["cohere", "langchain_community", "langchain_core", "langchain [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.14" -content-hash = "c873e640db32d6ca60417764d97eafaa122eb155f004686b21fef4b055b58846" +content-hash = "eb792c4be3d1543876f6bff3a15f2fbc7d235ab7b49279b85089837210000e7e" diff --git a/pyproject.toml b/pyproject.toml index 70d6c8136..d0061c31a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,7 @@ pdf2image = { version = "1.16.3", optional = true } pyarrow = { version = "^19.0.0", optional = true } pytesseract = { version = "0.3.10", optional = true } # Used indirectly by unstructured library python-calamine = { version = "0.2.3", optional = true } # TODO: Remove if unused +openpyxl = { version = "^3.1.0", optional = true } python-snappy = { version = "0.7.3", optional = true } # TODO: remove if unused tiktoken = { version = "0.8.0", optional = true } nltk = { version = "3.9.1", optional = true } @@ -120,7 +121,7 @@ deptry = "^0.23.0" dagger-io = "0.19.0" [tool.poetry.extras] -file-based = ["avro", "fastavro", "pyarrow", "unstructured", "pdf2image", "pdfminer.six", "unstructured.pytesseract", "pytesseract", "markdown", "python-calamine", "python-snappy"] +file-based = ["avro", "fastavro", "pyarrow", "unstructured", "pdf2image", "pdfminer.six", "unstructured.pytesseract", "pytesseract", "markdown", "python-calamine", "openpyxl", "python-snappy"] vector-db-based = ["langchain_community", "langchain_core", "langchain_text_splitters", "openai", "cohere", "tiktoken"] sql = ["sqlalchemy"] dev = ["pytest"] @@ -252,6 +253,7 @@ DEP002 = [ "cohere", "markdown", "openai", + "openpyxl", "pdf2image", "pdfminer.six", "pytesseract", diff --git a/unit_tests/sources/file_based/file_types/test_excel_parser.py b/unit_tests/sources/file_based/file_types/test_excel_parser.py index f744e9e8a..18850e9b0 100644 --- a/unit_tests/sources/file_based/file_types/test_excel_parser.py +++ b/unit_tests/sources/file_based/file_types/test_excel_parser.py @@ -4,6 +4,7 @@ import datetime +import warnings from io import BytesIO from unittest.mock import MagicMock, Mock, mock_open, patch @@ -136,3 +137,104 @@ def test_file_read_error(mock_stream_reader, mock_logger, file_config, remote_fi list( parser.parse_records(file_config, remote_file, mock_stream_reader, mock_logger) ) + + +class FakePanic(BaseException): + """Simulates the PyO3 PanicException which does not inherit from Exception.""" + + +def test_open_and_parse_file_falls_back_to_openpyxl(mock_logger): + parser = ExcelParser() + fp = BytesIO(b"test") + remote_file = RemoteFile(uri="s3://mybucket/test.xlsx", last_modified=datetime.datetime.now()) + + fallback_df = pd.DataFrame({"a": [1]}) + + calamine_excel_file = MagicMock() + + def calamine_parse_side_effect(): + raise FakePanic( + "failed to construct date: PyErr { type: , value: ValueError('year 20225 is out of range'), traceback: None }" + ) + + calamine_excel_file.parse.side_effect = calamine_parse_side_effect + + openpyxl_excel_file = MagicMock() + + def openpyxl_parse_side_effect(): + warnings.warn("Cell A146 has invalid date", UserWarning) + return fallback_df + + openpyxl_excel_file.parse.side_effect = openpyxl_parse_side_effect + + with ( + patch("airbyte_cdk.sources.file_based.file_types.excel_parser.pd.ExcelFile") as mock_excel, + ): + mock_excel.side_effect = [calamine_excel_file, openpyxl_excel_file] + + result = parser.open_and_parse_file(fp, mock_logger, remote_file) + + pd.testing.assert_frame_equal(result, fallback_df) + assert mock_logger.warning.call_count == 2 + assert "Openpyxl warning" in mock_logger.warning.call_args_list[1].args[0] + + +def test_open_and_parse_file_does_not_swallow_system_exit(mock_logger): + """Test that SystemExit is not caught by the BaseException handler. + + This test ensures that critical system-level exceptions like SystemExit and KeyboardInterrupt + are not accidentally caught and suppressed by our BaseException handler in the Calamine parsing + method. These exceptions should always propagate up to allow proper program termination. + """ + parser = ExcelParser() + fp = BytesIO(b"test") + remote_file = RemoteFile(uri="s3://mybucket/test.xlsx", last_modified=datetime.datetime.now()) + + with patch("airbyte_cdk.sources.file_based.file_types.excel_parser.pd.ExcelFile") as mock_excel: + mock_excel.return_value.parse.side_effect = SystemExit() + + with pytest.raises(SystemExit): + parser.open_and_parse_file(fp, mock_logger, remote_file) + + +@pytest.mark.parametrize( + "exc_cls", + [ + pytest.param(OSError, id="os-error"), + ], +) +def test_openpyxl_logs_info_when_seek_fails(mock_logger, remote_file, exc_cls): + """Test that openpyxl logs info when seek fails on non-seekable files. + + This test ensures that when falling back to openpyxl, if the file pointer + cannot be rewound (seek fails with OSError), an info-level log is emitted + and parsing proceeds from the current position. + """ + parser = ExcelParser() + fallback_df = pd.DataFrame({"a": [1]}) + + class FakeFP: + """Fake file-like object with a seek method that raises an exception.""" + + def __init__(self, exc): + self._exc = exc + + def seek(self, *args, **kwargs): + raise self._exc("not seekable") + + fp = FakeFP(exc_cls) + + openpyxl_excel_file = MagicMock() + openpyxl_excel_file.parse.return_value = fallback_df + + with patch("airbyte_cdk.sources.file_based.file_types.excel_parser.pd.ExcelFile") as mock_excel: + mock_excel.return_value = openpyxl_excel_file + + result = parser._open_and_parse_file_with_openpyxl(fp, mock_logger, remote_file) + + pd.testing.assert_frame_equal(result, fallback_df) + mock_logger.info.assert_called_once() + msg = mock_logger.info.call_args[0][0] + assert "Could not rewind stream" in msg + assert remote_file.file_uri_for_logging in msg + mock_excel.assert_called_once_with(fp, engine="openpyxl")