Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1ff8b81
chore(deps): bump unstructured from 0.10.27 to 0.17.2
devin-ai-integration[bot] Apr 18, 2025
5646f38
fix: update unstructured_parser.py to work with unstructured 0.17.2
devin-ai-integration[bot] Apr 18, 2025
a791703
fix: update variable names to avoid redefinition in unstructured_pars…
devin-ai-integration[bot] Apr 18, 2025
906124e
fix: resolve MyPy errors in unstructured_parser.py
devin-ai-integration[bot] Apr 18, 2025
c84bebc
fix: apply ruff formatting and fix import sorting
devin-ai-integration[bot] Apr 18, 2025
f2b959c
fix: update detect_filetype calls to match new API
devin-ai-integration[bot] Apr 18, 2025
08a365b
fix: update detect_filetype to handle file content detection properly
devin-ai-integration[bot] Apr 18, 2025
7ba8538
fix: improve markdown file detection for unstructured parser
devin-ai-integration[bot] Apr 18, 2025
d2bb776
fix: apply ruff formatting to unstructured_parser.py
devin-ai-integration[bot] Apr 18, 2025
3dbb175
fix: improve markdown file detection for unstructured parser
devin-ai-integration[bot] Apr 18, 2025
91f34da
fix: remove trailing whitespace to fix ruff format check
devin-ai-integration[bot] Apr 18, 2025
8199546
fix: update partition_pdf import path and add pi-heif dependency
devin-ai-integration[bot] Apr 18, 2025
22505fc
chore: update poetry.lock after adding pi-heif dependency
devin-ai-integration[bot] Apr 18, 2025
be3c07b
fix: update import path for unstructured.partition.pdf in 0.17.2
devin-ai-integration[bot] Apr 18, 2025
a36d12a
fix: add pi-heif as direct dependency and improve markdown file handling
devin-ai-integration[bot] Apr 18, 2025
f7ef188
fix: add pi-heif import to satisfy deptry check
devin-ai-integration[bot] Apr 18, 2025
66c6235
fix: improve markdown file type detection for unstructured parser
devin-ai-integration[bot] Apr 18, 2025
dfe037f
Fix formatting issues in unstructured_parser.py
devin-ai-integration[bot] Apr 18, 2025
a12b989
fix: improve PPTX file type detection for files without extensions
devin-ai-integration[bot] Apr 18, 2025
e77dc1b
style: fix formatting issues
devin-ai-integration[bot] Apr 18, 2025
089325f
fix: update error message format in unstructured parser to match test…
devin-ai-integration[bot] Apr 19, 2025
ac669e3
style: fix formatting issues
devin-ai-integration[bot] Apr 19, 2025
027fa8f
Merge branch 'main' into devin/1744958125-bump-unstructured-to-latest
aaronsteers Apr 19, 2025
fe3f617
fix: update error message format in unstructured parser to match expe…
devin-ai-integration[bot] Apr 19, 2025
bfa63fd
fix: format code to fix CI failures
devin-ai-integration[bot] Apr 19, 2025
4b9ba62
fix: return FileType.CSV instead of string to fix MyPy error
devin-ai-integration[bot] Apr 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2315,7 +2315,7 @@ definitions:
properties:
type:
type: string
enum: [ KeyTransformation ]
enum: [KeyTransformation]
prefix:
title: Key Prefix
description: Prefix to add for object keys. If not provided original keys remain unchanged.
Expand Down
209 changes: 157 additions & 52 deletions airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
import mimetypes
import os
import traceback
from datetime import datetime
from io import BytesIO, IOBase
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union, cast

import backoff
import dpath
import nltk
import pi_heif # Required for handling HEIF images
import requests
from unstructured.file_utils.filetype import (
EXT_TO_FILETYPE,
FILETYPE_TO_MIMETYPE,
STR_TO_FILETYPE,
FileType,
detect_filetype,
)
from unstructured.file_utils.filetype import FileType, detect_filetype

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
Expand Down Expand Up @@ -141,25 +137,38 @@ async def infer_schema(
format = _extract_format(config)
with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
filetype = self._get_filetype(file_handle, file)
if filetype not in self._supported_file_types() and not format.skip_unprocessable_files:
raise self._create_parse_error(
file,
self._get_file_type_error_message(filetype),
if (
isinstance(filetype, str) or filetype not in self._supported_file_types()
) and not format.skip_unprocessable_files:
error_message = self._get_file_type_error_message(filetype)
logger.error(f"File {file.uri} has unsupported type: {error_message}")
raise AirbyteTracedException(
message=error_message,
internal_message="Please check the logged errors for more information.",
failure_type=FailureType.config_error,
)

return {
"content": {
"type": "string",
"type": ["null", "string"],
"description": "Content of the file as markdown. Might be null if the file could not be parsed",
},
"document_key": {
"type": "string",
"type": ["null", "string"],
"description": "Unique identifier of the document, e.g. the file path",
},
"_ab_source_file_parse_error": {
"type": "string",
"type": ["null", "string"],
"description": "Error message if the file could not be parsed even though the file is supported",
},
"_ab_source_file_last_modified": {
"type": ["null", "string"],
"description": "Last modified timestamp of the source file",
},
"_ab_source_file_url": {
"type": ["null", "string"],
"description": "URL or path to the source file",
},
}

def parse_records(
Expand All @@ -178,20 +187,57 @@ def parse_records(
"content": markdown,
"document_key": file.uri,
"_ab_source_file_parse_error": None,
"_ab_source_file_last_modified": file.last_modified.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"_ab_source_file_url": file.uri,
}
except RecordParseError as e:
# RecordParseError is raised when the file can't be parsed because of a problem with the file content (either the file is not supported or the file is corrupted)
# if the skip_unprocessable_files flag is set, we log a warning and pass the error as part of the document
# otherwise, we raise the error to fail the sync
exception_str = str(e)
if format.skip_unprocessable_files:
logger.warning(
f"File {file.uri} caused an error during parsing: {exception_str}."
)
error_message = f"Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. Contact Support if you need assistance.\nfilename={file.uri} message={exception_str}"
yield {
"content": None,
"document_key": file.uri,
"_ab_source_file_parse_error": error_message,
"_ab_source_file_last_modified": file.last_modified.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"_ab_source_file_url": file.uri,
}
logger.warning(f"File {file.uri} cannot be parsed. Skipping it.")
else:
logger.error(
f"File {file.uri} caused an error during parsing: {exception_str}."
)
raise AirbyteTracedException(
message="Please check the logged errors for more information.",
internal_message=exception_str,
failure_type=FailureType.config_error,
)
except AirbyteTracedException as e:
if format.skip_unprocessable_files:
exception_str = str(e)
logger.warn(f"File {file.uri} caused an error during parsing: {exception_str}.")
logger.warning(
f"File {file.uri} caused an error during parsing: {exception_str}."
)
error_message = f"Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. Contact Support if you need assistance.\nfilename={file.uri} message={exception_str}"
yield {
"content": None,
"document_key": file.uri,
"_ab_source_file_parse_error": exception_str,
"_ab_source_file_parse_error": error_message,
"_ab_source_file_last_modified": file.last_modified.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"_ab_source_file_url": file.uri,
}
logger.warn(f"File {file.uri} cannot be parsed. Skipping it.")
logger.warning(f"File {file.uri} cannot be parsed. Skipping it.")
else:
raise e
except Exception as e:
Expand All @@ -218,14 +264,21 @@ def _read_file(
filetype: FileType | None = self._get_filetype(file_handle, remote_file)

if filetype is None or filetype not in self._supported_file_types():
raise self._create_parse_error(
remote_file,
self._get_file_type_error_message(filetype),
error_message = self._get_file_type_error_message(filetype)
logger.error(f"File {remote_file.uri} has unsupported type: {error_message}")
raise AirbyteTracedException(
message="Please check the logged errors for more information.",
internal_message=error_message,
failure_type=FailureType.config_error,
)
if filetype in {FileType.MD, FileType.TXT}:
file_content: bytes = file_handle.read()
decoded_content: str = optional_decode(file_content)
return decoded_content
try:
file_content: bytes = file_handle.read()
decoded_content: str = optional_decode(file_content)
return decoded_content
except Exception as e:
logger.error(f"Error reading {filetype} file: {str(e)}")
raise self._create_parse_error(remote_file, str(e))
if format.processing.mode == "local":
return self._read_file_locally(
file_handle,
Expand Down Expand Up @@ -335,10 +388,16 @@ def _read_file_remotely(

data = self._params_to_dict(format.parameters, strategy)

file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])}
mime_type = (
mimetypes.guess_type(f"file.{filetype.name.lower()}")[0]
if filetype
else "application/octet-stream"
)

files = cast(Any, {"files": ("filename", file_handle, mime_type)})

response = requests.post(
f"{format.api_url}/general/v0/general", headers=headers, data=data, files=file_data
f"{format.api_url}/general/v0/general", headers=headers, data=data, files=files
)

if response.status_code == 422:
Expand All @@ -364,24 +423,25 @@ def _read_file_locally(
# check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point)
raise Exception("unstructured library is not available")

file: Any = file_handle

# before the parsing logic is entered, the file is read completely to make sure it is in local memory
file_handle.seek(0)
file_handle.read()
file_content = file_handle.read()
file_handle.seek(0)

try:
if filetype == FileType.PDF:
# for PDF, read the file into a BytesIO object because some code paths in pdf parsing are doing an instance check on the file object and don't work with file-like objects
file_handle.seek(0)
with BytesIO(file_handle.read()) as file:
file_handle.seek(0)
# For all file types, create a fresh BytesIO to avoid issues with file-like objects
with BytesIO(file_content) as file:
if filetype == FileType.PDF:
elements = unstructured_partition_pdf(file=file, strategy=strategy)
elif filetype == FileType.DOCX:
elements = unstructured_partition_docx(file=file)
elif filetype == FileType.PPTX:
elements = unstructured_partition_pptx(file=file)
elif filetype == FileType.DOCX:
elements = unstructured_partition_docx(file=file)
elif filetype == FileType.PPTX:
elements = unstructured_partition_pptx(file=file)
else:
raise self._create_parse_error(
remote_file,
f"Unsupported file type {filetype} for local processing",
)
except Exception as e:
raise self._create_parse_error(remote_file, str(e))

Expand All @@ -405,8 +465,13 @@ def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileT
2. Use the file name if available
3. Use the file content
"""
if remote_file.mime_type and remote_file.mime_type in STR_TO_FILETYPE:
return STR_TO_FILETYPE[remote_file.mime_type]
if remote_file.mime_type:
for file_type in FileType:
if (
mimetypes.guess_type(f"file.{file_type.name.lower()}")[0]
== remote_file.mime_type
):
return file_type

# set name to none, otherwise unstructured will try to get the modified date from the local file system
if hasattr(file, "name"):
Expand All @@ -415,27 +480,67 @@ def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileT
# detect_filetype is either using the file name or file content
# if possible, try to leverage the file name to detect the file type
# if the file name is not available, use the file content
file_type: FileType | None = None
detected_type: FileType | None = None
try:
file_type = detect_filetype(
filename=remote_file.uri,
)
detected_type = detect_filetype(remote_file.uri)
except Exception:
# Path doesn't exist locally. Try something else...
pass

if file_type and file_type != FileType.UNK:
return file_type
if detected_type and detected_type != FileType.UNK:
return detected_type

type_based_on_content = detect_filetype(file=file)
file.seek(0) # detect_filetype is reading to read the file content, so we need to reset
file.seek(0)
try:
file_content = file.read(4096) # Read a sample of the file to detect type
file.seek(0)

if isinstance(file_content, bytes) and file_content.startswith(b"%PDF-"):
return FileType.PDF

if isinstance(file_content, bytes) and file_content.startswith(b"PK\x03\x04"):
if (
b"ppt/" in file_content
or b"application/vnd.openxmlformats-officedocument.presentationml"
in file_content
):
return FileType.PPTX
elif b"word/" in file_content or b"[Content_Types].xml" in file_content:
return FileType.DOCX

if file_content and isinstance(file_content, bytes):
try:
content_str = file_content.decode("utf-8", errors="ignore")
if (
content_str.lstrip().startswith("#")
or remote_file.mime_type == "text/markdown"
or remote_file.uri.endswith(".md")
):
return FileType.MD
elif content_str.strip() and not any(
c for c in content_str[:100] if ord(c) > 127
):
return FileType.TXT
except UnicodeDecodeError:
pass # Not a text file

type_based_on_content = FileType.UNK
except Exception as e:
type_based_on_content = FileType.UNK
file.seek(0) # Reset file position after reading

if type_based_on_content and type_based_on_content != FileType.UNK:
return type_based_on_content

extension = "." + remote_file.uri.split(".")[-1].lower()
if extension in EXT_TO_FILETYPE:
return EXT_TO_FILETYPE[extension]
if "." in remote_file.uri:
extension = "." + remote_file.uri.split(".")[-1].lower()
if extension == ".csv":
return FileType.CSV
for file_type in FileType:
if file_type.name.lower() == extension[1:].lower():
return file_type
if remote_file.uri.endswith(".md") or remote_file.mime_type == "text/markdown":
return FileType.MD

return None

Expand All @@ -444,7 +549,7 @@ def _supported_file_types(self) -> List[Any]:

def _get_file_type_error_message(
self,
file_type: FileType | None,
file_type: Union[FileType, str, None],
) -> str:
supported_file_types = ", ".join([str(type) for type in self._supported_file_types()])
return f"File type {file_type or 'None'!s} is not supported. Supported file types are {supported_file_types}"
Expand Down
Loading
Loading