diff --git a/CHANGELOG.md b/CHANGELOG.md index 16d69e13e..3d9996990 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 1.1.4 + +* **Fix: Prevent download path conflicts using collision-safe temporary file generation** + ## 1.1.3 * **Fix: Remove unnecessary deletion operation in ES connector** diff --git a/test/integration/connectors/utils/validation/source.py b/test/integration/connectors/utils/validation/source.py index 7c2e7a7e4..6ef6ce256 100644 --- a/test/integration/connectors/utils/validation/source.py +++ b/test/integration/connectors/utils/validation/source.py @@ -102,7 +102,22 @@ def check_files(expected_output_dir: Path, all_file_data: list[FileData]): def check_files_in_paths(expected_output_dir: Path, current_output_dir: Path): expected_files = get_files(dir_path=expected_output_dir) current_files = get_files(dir_path=current_output_dir) - diff = set(expected_files) ^ set(current_files) + + # Extract original filenames from tempfile names + actual_filenames = [] + for current_file in current_files: + for expected_file in expected_files: + if current_file.endswith('_' + expected_file): + actual_filenames.append(expected_file) + break + else: + actual_filenames.append( + current_file.split('_', 1)[1] if '_' in current_file else current_file + ) + + expected_files.sort() + actual_filenames.sort() + diff = set(expected_files) ^ set(actual_filenames) assert not diff, "diff in files that exist: {}".format(", ".join(diff)) @@ -130,11 +145,23 @@ def check_raw_file_contents( configs: SourceValidationConfigs, ): current_files = get_files(dir_path=current_output_dir) + expected_files = get_files(dir_path=expected_output_dir) found_diff = False files = [] + for current_file in current_files: + # Extract original filename from tempfile name + original_filename = None + for expected_file in expected_files: + if current_file.endswith('_' + expected_file): + original_filename = expected_file + break + else: + original_filename = current_file.split('_', 1)[1] if '_' in current_file else current_file + current_file_path = current_output_dir / current_file - expected_file_path = expected_output_dir / current_file + expected_file_path = expected_output_dir / original_filename + if configs.detect_diff(expected_file_path, current_file_path): found_diff = True files.append(str(expected_file_path)) @@ -168,10 +195,30 @@ def run_expected_download_files_validation( def run_directory_structure_validation(expected_output_dir: Path, download_files: list[str]): directory_record = expected_output_dir / "directory_structure.json" - with directory_record.open("r") as directory_file: - directory_file_contents = json.load(directory_file) - directory_structure = directory_file_contents["directory_structure"] - assert directory_structure == download_files + with directory_record.open("r") as f: + directory_structure = json.load(f)["directory_structure"] + + # Check if downloads match expected structure exactly (non-tempfile case) + if set(directory_structure) == set(download_files): + assert len(download_files) == len(set(download_files)) + return + + # Try tempfile validation logic + actual_filenames = [] + for download_file in download_files: + for expected_filename in directory_structure: + if download_file.endswith('_' + expected_filename): + actual_filenames.append(expected_filename) + break + else: + actual_filenames.append( + download_file.split('_', 1)[1] if '_' in download_file else download_file + ) + + directory_structure.sort() + actual_filenames.sort() + assert directory_structure == actual_filenames + assert len(download_files) == len(set(download_files)) def update_fixtures( diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 07d39da01..8a43ac845 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "1.1.3" # pragma: no cover +__version__ = "1.1.4" # pragma: no cover diff --git a/unstructured_ingest/interfaces/downloader.py b/unstructured_ingest/interfaces/downloader.py index d72dfe89f..c81a52dad 100644 --- a/unstructured_ingest/interfaces/downloader.py +++ b/unstructured_ingest/interfaces/downloader.py @@ -1,4 +1,5 @@ import os +import tempfile from abc import ABC from pathlib import Path from typing import Any, Optional, TypedDict, TypeVar, Union @@ -36,11 +37,27 @@ class Downloader(BaseProcess, BaseConnector, ABC): def get_download_path(self, file_data: FileData) -> Optional[Path]: if not file_data.source_identifiers: return None + rel_path = file_data.source_identifiers.relative_path if not rel_path: return None - rel_path = rel_path[1:] if rel_path.startswith("/") else rel_path - return self.download_dir / Path(rel_path) + + original_filename = Path(rel_path).name + if not original_filename: + return None + + download_dir = self.download_dir + download_dir.mkdir(parents=True, exist_ok=True) + + fd, temp_path = tempfile.mkstemp( + suffix=f"_{original_filename}", + dir=download_dir, + text=False + ) + os.close(fd) + os.unlink(temp_path) + + return Path(temp_path) @staticmethod def is_float(value: str): diff --git a/unstructured_ingest/processes/connectors/fsspec/fsspec.py b/unstructured_ingest/processes/connectors/fsspec/fsspec.py index ab6fded0e..3b7aa1b9e 100644 --- a/unstructured_ingest/processes/connectors/fsspec/fsspec.py +++ b/unstructured_ingest/processes/connectors/fsspec/fsspec.py @@ -270,6 +270,7 @@ class FsspecDownloader(Downloader): download_config: Optional[FsspecDownloaderConfigT] = field( default_factory=lambda: FsspecDownloaderConfig() ) + def is_async(self) -> bool: with self.connection_config.get_client(protocol=self.protocol) as client: diff --git a/unstructured_ingest/utils/filesystem.py b/unstructured_ingest/utils/filesystem.py index 0fa8d182e..861d4a450 100644 --- a/unstructured_ingest/utils/filesystem.py +++ b/unstructured_ingest/utils/filesystem.py @@ -5,7 +5,9 @@ when multiple processes operate on the same directory structures simultaneously. """ +import hashlib from pathlib import Path +from typing import Optional def mkdir_concurrent_safe(path: Path) -> None: @@ -24,4 +26,27 @@ def mkdir_concurrent_safe(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) except FileExistsError: if not (path.exists() and path.is_dir()): - raise \ No newline at end of file + raise + + +def generate_hash_based_path(s3_key: str, base_dir: Optional[Path] = None) -> Optional[Path]: + """ + Generate hash-based download path to prevent S3 path conflicts. + + Prevents conflicts when S3's flat namespace is mapped to hierarchical filesystems. + E.g., 'foo' (file) and 'foo/document' (needs foo as directory) would conflict. + """ + if not s3_key: + return None + + normalized_path = s3_key.lstrip("/") + if not normalized_path or not normalized_path.strip(): + return None + + filename = Path(normalized_path).name + if not filename: + return None + + dir_hash = hashlib.sha256(normalized_path.encode('utf-8')).hexdigest()[:12] + relative_path = Path(dir_hash) / filename + return base_dir / relative_path if base_dir else relative_path \ No newline at end of file