-
Notifications
You must be signed in to change notification settings - Fork 32
feat(file-based): Add Calamine-first with Openpyxl fallback for Excel parser #850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 13 commits
5be5bee
65e8adc
a7664eb
34fe892
adfe576
88084ad
546bd46
67fa697
e431f9d
a82a2fa
6a38d55
fef9ac2
fd1939e
d2f691a
63d24a6
44f7df1
49f3e19
fffe027
9d6428c
0831b04
463be27
95fc5e3
38a1a1c
3277b70
7d73cc6
d2b0255
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,9 +3,10 @@ | |||||||||||||
| # | ||||||||||||||
|
|
||||||||||||||
| import logging | ||||||||||||||
| import warnings | ||||||||||||||
| from io import IOBase | ||||||||||||||
| from pathlib import Path | ||||||||||||||
| from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union | ||||||||||||||
| from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Type, Union, cast | ||||||||||||||
|
|
||||||||||||||
| import orjson | ||||||||||||||
| import pandas as pd | ||||||||||||||
|
|
@@ -17,6 +18,7 @@ | |||||||||||||
| ) | ||||||||||||||
| from airbyte_cdk.sources.file_based.exceptions import ( | ||||||||||||||
| ConfigValidationError, | ||||||||||||||
| ExcelCalamineParsingError, | ||||||||||||||
| FileBasedSourceError, | ||||||||||||||
| RecordParseError, | ||||||||||||||
| ) | ||||||||||||||
|
|
@@ -64,7 +66,7 @@ async def infer_schema( | |||||||||||||
| fields: Dict[str, str] = {} | ||||||||||||||
|
|
||||||||||||||
| with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: | ||||||||||||||
| df = self.open_and_parse_file(fp) | ||||||||||||||
| df = self.open_and_parse_file(fp, logger, file) | ||||||||||||||
| for column, df_type in df.dtypes.items(): | ||||||||||||||
| # Choose the broadest data type if the column's data type differs in dataframes | ||||||||||||||
| prev_frame_column_type = fields.get(column) # type: ignore [call-overload] | ||||||||||||||
|
|
@@ -92,7 +94,7 @@ def parse_records( | |||||||||||||
| discovered_schema: Optional[Mapping[str, SchemaType]] = None, | ||||||||||||||
| ) -> Iterable[Dict[str, Any]]: | ||||||||||||||
| """ | ||||||||||||||
| Parses records from an Excel file based on the provided configuration. | ||||||||||||||
| Parses records from an Excel file with fallback error handling. | ||||||||||||||
|
|
||||||||||||||
| Args: | ||||||||||||||
| config (FileBasedStreamConfig): Configuration for the file-based stream. | ||||||||||||||
|
|
@@ -111,7 +113,7 @@ def parse_records( | |||||||||||||
| try: | ||||||||||||||
| # Open and parse the file using the stream reader | ||||||||||||||
| with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: | ||||||||||||||
| df = self.open_and_parse_file(fp) | ||||||||||||||
| df = self.open_and_parse_file(fp, logger, file) | ||||||||||||||
| # Yield records as dictionaries | ||||||||||||||
| # DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson | ||||||||||||||
| # DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior | ||||||||||||||
|
|
@@ -181,14 +183,91 @@ def validate_format(excel_format: BaseModel, logger: logging.Logger) -> None: | |||||||||||||
| raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) | ||||||||||||||
|
|
||||||||||||||
| @staticmethod | ||||||||||||||
| def open_and_parse_file(fp: Union[IOBase, str, Path]) -> pd.DataFrame: | ||||||||||||||
| def _open_and_parse_file_with_calamine( | ||||||||||||||
| fp: Union[IOBase, str, Path], | ||||||||||||||
| logger: logging.Logger, | ||||||||||||||
| file_info: RemoteFile, | ||||||||||||||
| ) -> pd.DataFrame: | ||||||||||||||
| """Opens and parses Excel file using Calamine engine. | ||||||||||||||
|
|
||||||||||||||
| Args: | ||||||||||||||
| fp: File pointer to the Excel file. | ||||||||||||||
| logger: Logger for logging information and errors. | ||||||||||||||
| file_info: Remote file information for logging context. | ||||||||||||||
|
|
||||||||||||||
|
||||||||||||||
| Args: | |
| fp (Union[IOBase, str, Path]): The file pointer, file path, or file-like object to parse. | |
| logger (Optional[logging.Logger]): Optional logger for warning and error messages. | |
| file_info (Optional[Union[str, RemoteFile]]): Optional file information (RemoteFile or string) used for logging context. |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use file as naming instead of file_info because we know this is an instance of RemoteFile?
here and in _open_and_parse_file_with_calamine and in _open_and_parse_file_with_openpyxl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! I've renamed file_info to file across all three methods.
Changes Made:
_open_and_parse_file_with_calamine: Renamed parameter and all references_open_and_parse_file_with_openpyxl: Renamed parameter and all referencesopen_and_parse_file: Renamed parameter and all references
Rationale:
Since the type is RemoteFile, the shorter name file is more appropriate and makes the code more concise while maintaining clarity.
Local Verification:
- ✅ All Excel parser tests pass (5 passed, 1 skipped)
- ✅ Ruff format and lint pass
Commit: fffe027 - "refactor: Rename file_info parameter to file in Excel parsing methods"
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like with this change open_and_parse_file method should not be static, should be updated to use self
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Since these methods are calling each other within the class, they should be regular instance methods using self instead of @staticmethod. I'll update all three methods to remove @staticmethod and use self for the method calls.
github-code-quality[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this try/except needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try/except is needed because some file-like objects (like certain stream types) may not support seeking. The AttributeError catches objects that don't have a seek method, and OSError catches objects that have seek but it fails (e.g., non-seekable streams).
If we don't catch these exceptions, the fallback would fail even though Openpyxl might still be able to parse the file from its current position. The comment explains this: "Some file-like objects may not be seekable; attempt openpyxl parsing anyway"
However, I agree this could be improved - we could log a warning when seeking fails to make it more visible that the stream might be partially consumed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in except block we have pass so it will call _open_and_parse_file_with_openpyxl anyway.
I suggest to add this try/except block into _open_and_parse_file_with_openpyxl and add logging instead of pass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! I've moved the fp.seek(0) try/except block into _open_and_parse_file_with_openpyxl and added logging instead of pass.
Changes made:
- ✅ Moved seek logic into
_open_and_parse_file_with_openpyxlmethod - ✅ Added
hasattr(fp, "seek")check to avoid AttributeError on non-file-like objects (str/Path) - ✅ Added info-level logging for seek failures:
"Could not rewind stream for {file_uri}; proceeding with openpyxl from current position: {exc}" - ✅ Removed duplicate seek logic from
open_and_parse_fileorchestration method - ✅ Added brief comment explaining why we attempt to rewind
Why info-level instead of warning:
- The Calamine fallback warning and openpyxl warnings are already logged at warning level
- A failed seek is not necessarily a problem by itself since we still attempt parsing
- Using info keeps the warning count stable (the test asserts 2 warnings: fallback + openpyxl)
This refactoring centralizes fallback-specific concerns within the openpyxl path and simplifies the orchestration method to focus purely on flow control.
Commit: 63d24a6 - "refactor: Move seek logic into _open_and_parse_file_with_openpyxl"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a unit test with not seekable file and call read with openpyxl and confirm the info log is being emitted? The test should cover non seekable files that raise both AttributeError and OSError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! I've added a parametrized unit test that covers both AttributeError and OSError cases for non-seekable files.
Test Added:
test_openpyxl_logs_info_when_seek_fails - Parametrized test with two cases:
attribute-error: Tests when seek() raises AttributeErroros-error: Tests when seek() raises OSError
Test Implementation:
- Creates a
FakeFPclass with aseek()method that raises the desired exception - Tests
_open_and_parse_file_with_openpyxldirectly to isolate the behavior - Verifies the info log is emitted with correct content:
- Checks for "Could not rewind stream" message
- Verifies file URI is included in the log message
- Confirms parsing proceeds and returns the expected DataFrame
- Verifies pd.ExcelFile is called with
engine="openpyxl"
Local Verification:
- ✅ Both test cases pass (attribute-error and os-error)
- ✅ All Excel parser tests pass (6 passed, 1 skipped)
- ✅ Ruff format and lint pass
Commit: 44f7df1 - "test: Add parametrized test for non-seekable files in openpyxl fallback"
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
|
|
||
|
|
||
| import datetime | ||
| import warnings | ||
| from io import BytesIO | ||
| from unittest.mock import MagicMock, Mock, mock_open, patch | ||
|
|
||
|
|
@@ -136,3 +137,55 @@ def test_file_read_error(mock_stream_reader, mock_logger, file_config, remote_fi | |
| list( | ||
| parser.parse_records(file_config, remote_file, mock_stream_reader, mock_logger) | ||
| ) | ||
|
|
||
|
|
||
| class FakePanic(BaseException): | ||
| """Simulates the PyO3 PanicException which does not inherit from Exception.""" | ||
|
|
||
|
|
||
| def test_open_and_parse_file_falls_back_to_openpyxl(mock_logger): | ||
| parser = ExcelParser() | ||
| fp = BytesIO(b"test") | ||
| remote_file = RemoteFile(uri="s3://mybucket/test.xlsx", last_modified=datetime.datetime.now()) | ||
|
|
||
| fallback_df = pd.DataFrame({"a": [1]}) | ||
|
|
||
| calamine_excel_file = MagicMock() | ||
|
|
||
| def calamine_parse_side_effect(): | ||
| raise FakePanic( | ||
| "failed to construct date: PyErr { type: <class 'ValueError'>, value: ValueError('year 20225 is out of range'), traceback: None }" | ||
| ) | ||
|
|
||
| calamine_excel_file.parse.side_effect = calamine_parse_side_effect | ||
|
|
||
| openpyxl_excel_file = MagicMock() | ||
|
|
||
| def openpyxl_parse_side_effect(): | ||
| warnings.warn("Cell A146 has invalid date", UserWarning) | ||
| return fallback_df | ||
|
|
||
| openpyxl_excel_file.parse.side_effect = openpyxl_parse_side_effect | ||
|
|
||
| with ( | ||
| patch("airbyte_cdk.sources.file_based.file_types.excel_parser.pd.ExcelFile") as mock_excel, | ||
| ): | ||
| mock_excel.side_effect = [calamine_excel_file, openpyxl_excel_file] | ||
|
|
||
| result = parser.open_and_parse_file(fp, mock_logger, remote_file) | ||
|
|
||
| pd.testing.assert_frame_equal(result, fallback_df) | ||
| assert mock_logger.warning.call_count == 2 | ||
| assert "Openpyxl warning" in mock_logger.warning.call_args_list[1].args[0] | ||
|
|
||
|
|
||
| def test_open_and_parse_file_does_not_swallow_keyboard_interrupt(mock_logger): | ||
| parser = ExcelParser() | ||
| fp = BytesIO(b"test") | ||
| remote_file = RemoteFile(uri="s3://mybucket/test.xlsx", last_modified=datetime.datetime.now()) | ||
|
|
||
| with patch("airbyte_cdk.sources.file_based.file_types.excel_parser.pd.ExcelFile") as mock_excel: | ||
| mock_excel.return_value.parse.side_effect = KeyboardInterrupt() | ||
|
||
|
|
||
| with pytest.raises(KeyboardInterrupt): | ||
| parser.open_and_parse_file(fp, mock_logger, remote_file) | ||
Uh oh!
There was an error while loading. Please reload this page.