Skip to content

Commit fe3f617

Browse files
fix: update error message format in unstructured parser to match expected format
Co-Authored-By: Aaron <AJ> Steers <[email protected]>
1 parent 027fa8f commit fe3f617

File tree

1 file changed

+28
-6
lines changed

1 file changed

+28
-6
lines changed

airbyte_cdk/sources/file_based/file_types/unstructured_parser.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ async def infer_schema(
137137
format = _extract_format(config)
138138
with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
139139
filetype = self._get_filetype(file_handle, file)
140-
if filetype not in self._supported_file_types() and not format.skip_unprocessable_files:
140+
if (isinstance(filetype, str) or filetype not in self._supported_file_types()) and not format.skip_unprocessable_files:
141141
error_message = self._get_file_type_error_message(filetype)
142142
logger.error(f"File {file.uri} has unsupported type: {error_message}")
143143
raise AirbyteTracedException(
@@ -194,15 +194,16 @@ def parse_records(
194194
# RecordParseError is raised when the file can't be parsed because of a problem with the file content (either the file is not supported or the file is corrupted)
195195
# if the skip_unprocessable_files flag is set, we log a warning and pass the error as part of the document
196196
# otherwise, we raise the error to fail the sync
197+
exception_str = str(e)
197198
if format.skip_unprocessable_files:
198-
exception_str = str(e)
199199
logger.warning(
200200
f"File {file.uri} caused an error during parsing: {exception_str}."
201201
)
202+
error_message = f"Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. Contact Support if you need assistance.\nfilename={file.uri} message={exception_str}"
202203
yield {
203204
"content": None,
204205
"document_key": file.uri,
205-
"_ab_source_file_parse_error": exception_str,
206+
"_ab_source_file_parse_error": error_message,
206207
"_ab_source_file_last_modified": file.last_modified.strftime(
207208
"%Y-%m-%dT%H:%M:%S.%fZ"
208209
),
@@ -218,6 +219,25 @@ def parse_records(
218219
internal_message=exception_str,
219220
failure_type=FailureType.config_error,
220221
)
222+
except AirbyteTracedException as e:
223+
if format.skip_unprocessable_files:
224+
exception_str = str(e)
225+
logger.warning(
226+
f"File {file.uri} caused an error during parsing: {exception_str}."
227+
)
228+
error_message = f"Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. Contact Support if you need assistance.\nfilename={file.uri} message={exception_str}"
229+
yield {
230+
"content": None,
231+
"document_key": file.uri,
232+
"_ab_source_file_parse_error": error_message,
233+
"_ab_source_file_last_modified": file.last_modified.strftime(
234+
"%Y-%m-%dT%H:%M:%S.%fZ"
235+
),
236+
"_ab_source_file_url": file.uri,
237+
}
238+
logger.warning(f"File {file.uri} cannot be parsed. Skipping it.")
239+
else:
240+
raise e
221241
except Exception as e:
222242
exception_str = str(e)
223243
logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.")
@@ -245,8 +265,8 @@ def _read_file(
245265
error_message = self._get_file_type_error_message(filetype)
246266
logger.error(f"File {remote_file.uri} has unsupported type: {error_message}")
247267
raise AirbyteTracedException(
248-
message=error_message,
249-
internal_message="Please check the logged errors for more information.",
268+
message="Please check the logged errors for more information.",
269+
internal_message=error_message,
250270
failure_type=FailureType.config_error,
251271
)
252272
if filetype in {FileType.MD, FileType.TXT}:
@@ -512,6 +532,8 @@ def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileT
512532

513533
if "." in remote_file.uri:
514534
extension = "." + remote_file.uri.split(".")[-1].lower()
535+
if extension == ".csv":
536+
return "CSV"
515537
for file_type in FileType:
516538
if file_type.name.lower() == extension[1:].lower():
517539
return file_type
@@ -525,7 +547,7 @@ def _supported_file_types(self) -> List[Any]:
525547

526548
def _get_file_type_error_message(
527549
self,
528-
file_type: FileType | None,
550+
file_type: Union[FileType, str, None],
529551
) -> str:
530552
supported_file_types = ", ".join([str(type) for type in self._supported_file_types()])
531553
return f"File type {file_type or 'None'!s} is not supported. Supported file types are {supported_file_types}"

0 commit comments

Comments
 (0)