From 7f9e207015b928486a856f22ab91956de05f2fca Mon Sep 17 00:00:00 2001 From: CyMule Date: Tue, 29 Jul 2025 16:17:24 -0400 Subject: [PATCH 01/10] fix: prevent S3 path conflicts using hash-based directory isolation --- unstructured_ingest/interfaces/downloader.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/unstructured_ingest/interfaces/downloader.py b/unstructured_ingest/interfaces/downloader.py index d72dfe89f..99709b45a 100644 --- a/unstructured_ingest/interfaces/downloader.py +++ b/unstructured_ingest/interfaces/downloader.py @@ -1,3 +1,4 @@ +import hashlib import os from abc import ABC from pathlib import Path @@ -36,11 +37,21 @@ class Downloader(BaseProcess, BaseConnector, ABC): def get_download_path(self, file_data: FileData) -> Optional[Path]: if not file_data.source_identifiers: return None + rel_path = file_data.source_identifiers.relative_path if not rel_path: return None - rel_path = rel_path[1:] if rel_path.startswith("/") else rel_path - return self.download_dir / Path(rel_path) + + normalized_path = rel_path[1:] if rel_path.startswith("/") else rel_path + if not normalized_path or not normalized_path.strip(): + return None + + filename = Path(normalized_path).name + if not filename: + return None + + dir_hash = hashlib.sha256(normalized_path.encode('utf-8')).hexdigest()[:12] + return self.download_dir / dir_hash / filename @staticmethod def is_float(value: str): From ef832ccfcd24688b03d6f3834fe94c33c174f499 Mon Sep 17 00:00:00 2001 From: CyMule Date: Wed, 30 Jul 2025 15:47:15 -0400 Subject: [PATCH 02/10] version --- CHANGELOG.md | 4 ++++ unstructured_ingest/__version__.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 16d69e13e..8f4e13dcc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 1.1.4 + +* **Fix**: prevent S3 path conflicts using hash-based directory isolation + ## 1.1.3 * **Fix: Remove unnecessary deletion operation in ES connector** diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 07d39da01..8a43ac845 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "1.1.3" # pragma: no cover +__version__ = "1.1.4" # pragma: no cover From aca99e51f33bcc9bc27bcd10681b113af02f7037 Mon Sep 17 00:00:00 2001 From: CyMule Date: Wed, 30 Jul 2025 11:38:44 -0400 Subject: [PATCH 03/10] update test --- .../expected_results/s3/directory_structure.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/integration/connectors/expected_results/s3/directory_structure.json b/test/integration/connectors/expected_results/s3/directory_structure.json index 50a3f2db0..c741a1d96 100644 --- a/test/integration/connectors/expected_results/s3/directory_structure.json +++ b/test/integration/connectors/expected_results/s3/directory_structure.json @@ -1,8 +1,8 @@ { "directory_structure": [ - "2023-Jan-economic-outlook.pdf", - "Silent-Giant-(1).pdf", - "page-with-formula.pdf", - "recalibrating-risk-report.pdf" + "1d35c9052852/recalibrating-risk-report.pdf", + "44856114dace/Silent-Giant-(1).pdf", + "6fb4cb700561/page-with-formula.pdf", + "d48a58ba3f07/2023-Jan-economic-outlook.pdf" ] } \ No newline at end of file From 310277537f262c43d619491bbe71ae7f122b89ba Mon Sep 17 00:00:00 2001 From: CyMule Date: Wed, 30 Jul 2025 12:37:30 -0400 Subject: [PATCH 04/10] cleanup --- ...y_structure.json => expected_s3_keys.json} | 2 +- ...y_structure.json => expected_s3_keys.json} | 2 +- .../s3/directory_structure.json | 8 ------ .../expected_results/s3/expected_s3_keys.json | 8 ++++++ .../connectors/utils/validation/source.py | 19 +++++++++---- unstructured_ingest/interfaces/downloader.py | 13 ++------- unstructured_ingest/utils/filesystem.py | 27 ++++++++++++++++++- 7 files changed, 52 insertions(+), 27 deletions(-) rename test/integration/connectors/expected_results/s3-minio/{directory_structure.json => expected_s3_keys.json} (59%) rename test/integration/connectors/expected_results/s3-specialchar/{directory_structure.json => expected_s3_keys.json} (68%) delete mode 100644 test/integration/connectors/expected_results/s3/directory_structure.json create mode 100644 test/integration/connectors/expected_results/s3/expected_s3_keys.json diff --git a/test/integration/connectors/expected_results/s3-minio/directory_structure.json b/test/integration/connectors/expected_results/s3-minio/expected_s3_keys.json similarity index 59% rename from test/integration/connectors/expected_results/s3-minio/directory_structure.json rename to test/integration/connectors/expected_results/s3-minio/expected_s3_keys.json index 416044a4d..867581e3f 100644 --- a/test/integration/connectors/expected_results/s3-minio/directory_structure.json +++ b/test/integration/connectors/expected_results/s3-minio/expected_s3_keys.json @@ -1,5 +1,5 @@ { - "directory_structure": [ + "s3_keys": [ "wiki_movie_plots_small.csv" ] } \ No newline at end of file diff --git a/test/integration/connectors/expected_results/s3-specialchar/directory_structure.json b/test/integration/connectors/expected_results/s3-specialchar/expected_s3_keys.json similarity index 68% rename from test/integration/connectors/expected_results/s3-specialchar/directory_structure.json rename to test/integration/connectors/expected_results/s3-specialchar/expected_s3_keys.json index d9deb69e3..ebdee84c3 100644 --- a/test/integration/connectors/expected_results/s3-specialchar/directory_structure.json +++ b/test/integration/connectors/expected_results/s3-specialchar/expected_s3_keys.json @@ -1,5 +1,5 @@ { - "directory_structure": [ + "s3_keys": [ "Why_is_the_sky_blue?.txt", "[test]?*.txt" ] diff --git a/test/integration/connectors/expected_results/s3/directory_structure.json b/test/integration/connectors/expected_results/s3/directory_structure.json deleted file mode 100644 index c741a1d96..000000000 --- a/test/integration/connectors/expected_results/s3/directory_structure.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "directory_structure": [ - "1d35c9052852/recalibrating-risk-report.pdf", - "44856114dace/Silent-Giant-(1).pdf", - "6fb4cb700561/page-with-formula.pdf", - "d48a58ba3f07/2023-Jan-economic-outlook.pdf" - ] -} \ No newline at end of file diff --git a/test/integration/connectors/expected_results/s3/expected_s3_keys.json b/test/integration/connectors/expected_results/s3/expected_s3_keys.json new file mode 100644 index 000000000..ab9f86a31 --- /dev/null +++ b/test/integration/connectors/expected_results/s3/expected_s3_keys.json @@ -0,0 +1,8 @@ +{ + "s3_keys": [ + "2023-Jan-economic-outlook.pdf", + "Silent-Giant-(1).pdf", + "page-with-formula.pdf", + "recalibrating-risk-report.pdf" + ] +} \ No newline at end of file diff --git a/test/integration/connectors/utils/validation/source.py b/test/integration/connectors/utils/validation/source.py index 7c2e7a7e4..bc66daeae 100644 --- a/test/integration/connectors/utils/validation/source.py +++ b/test/integration/connectors/utils/validation/source.py @@ -10,6 +10,7 @@ from test.integration.connectors.utils.validation.utils import ValidationConfig from unstructured_ingest.data_types.file_data import FileData from unstructured_ingest.interfaces import Downloader, Indexer +from unstructured_ingest.utils.filesystem import generate_hash_based_path NONSTANDARD_METADATA_FIELDS = { "additional_metadata.@microsoft.graph.downloadUrl": [ @@ -167,11 +168,19 @@ def run_expected_download_files_validation( def run_directory_structure_validation(expected_output_dir: Path, download_files: list[str]): - directory_record = expected_output_dir / "directory_structure.json" - with directory_record.open("r") as directory_file: - directory_file_contents = json.load(directory_file) - directory_structure = directory_file_contents["directory_structure"] - assert directory_structure == download_files + s3_keys_file = expected_output_dir / "expected_s3_keys.json" + with s3_keys_file.open("r") as f: + s3_keys = json.load(f)["s3_keys"] + + expected_paths = [] + for s3_key in s3_keys: + hash_based_path = generate_hash_based_path(s3_key) + if hash_based_path: + expected_paths.append(str(hash_based_path)) + + expected_paths.sort() + download_files.sort() + assert expected_paths == download_files def update_fixtures( diff --git a/unstructured_ingest/interfaces/downloader.py b/unstructured_ingest/interfaces/downloader.py index 99709b45a..bf343b9fe 100644 --- a/unstructured_ingest/interfaces/downloader.py +++ b/unstructured_ingest/interfaces/downloader.py @@ -1,4 +1,3 @@ -import hashlib import os from abc import ABC from pathlib import Path @@ -9,6 +8,7 @@ from unstructured_ingest.data_types.file_data import FileData from unstructured_ingest.interfaces.connector import BaseConnector from unstructured_ingest.interfaces.process import BaseProcess +from unstructured_ingest.utils.filesystem import generate_hash_based_path class DownloaderConfig(BaseModel): @@ -42,16 +42,7 @@ def get_download_path(self, file_data: FileData) -> Optional[Path]: if not rel_path: return None - normalized_path = rel_path[1:] if rel_path.startswith("/") else rel_path - if not normalized_path or not normalized_path.strip(): - return None - - filename = Path(normalized_path).name - if not filename: - return None - - dir_hash = hashlib.sha256(normalized_path.encode('utf-8')).hexdigest()[:12] - return self.download_dir / dir_hash / filename + return generate_hash_based_path(rel_path, self.download_dir) @staticmethod def is_float(value: str): diff --git a/unstructured_ingest/utils/filesystem.py b/unstructured_ingest/utils/filesystem.py index 0fa8d182e..861d4a450 100644 --- a/unstructured_ingest/utils/filesystem.py +++ b/unstructured_ingest/utils/filesystem.py @@ -5,7 +5,9 @@ when multiple processes operate on the same directory structures simultaneously. """ +import hashlib from pathlib import Path +from typing import Optional def mkdir_concurrent_safe(path: Path) -> None: @@ -24,4 +26,27 @@ def mkdir_concurrent_safe(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) except FileExistsError: if not (path.exists() and path.is_dir()): - raise \ No newline at end of file + raise + + +def generate_hash_based_path(s3_key: str, base_dir: Optional[Path] = None) -> Optional[Path]: + """ + Generate hash-based download path to prevent S3 path conflicts. + + Prevents conflicts when S3's flat namespace is mapped to hierarchical filesystems. + E.g., 'foo' (file) and 'foo/document' (needs foo as directory) would conflict. + """ + if not s3_key: + return None + + normalized_path = s3_key.lstrip("/") + if not normalized_path or not normalized_path.strip(): + return None + + filename = Path(normalized_path).name + if not filename: + return None + + dir_hash = hashlib.sha256(normalized_path.encode('utf-8')).hexdigest()[:12] + relative_path = Path(dir_hash) / filename + return base_dir / relative_path if base_dir else relative_path \ No newline at end of file From eedf93cdcc7d767036e97d2f53549c2923d36016 Mon Sep 17 00:00:00 2001 From: CyMule Date: Wed, 30 Jul 2025 13:27:56 -0400 Subject: [PATCH 05/10] target s3 --- .../connectors/utils/validation/source.py | 29 ++++++++++++------- unstructured_ingest/interfaces/downloader.py | 4 +-- .../processes/connectors/fsspec/fsspec.py | 12 +++++++- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/test/integration/connectors/utils/validation/source.py b/test/integration/connectors/utils/validation/source.py index bc66daeae..2f9c97ab8 100644 --- a/test/integration/connectors/utils/validation/source.py +++ b/test/integration/connectors/utils/validation/source.py @@ -169,18 +169,25 @@ def run_expected_download_files_validation( def run_directory_structure_validation(expected_output_dir: Path, download_files: list[str]): s3_keys_file = expected_output_dir / "expected_s3_keys.json" - with s3_keys_file.open("r") as f: - s3_keys = json.load(f)["s3_keys"] - expected_paths = [] - for s3_key in s3_keys: - hash_based_path = generate_hash_based_path(s3_key) - if hash_based_path: - expected_paths.append(str(hash_based_path)) - - expected_paths.sort() - download_files.sort() - assert expected_paths == download_files + if s3_keys_file.exists(): + with s3_keys_file.open("r") as f: + s3_keys = json.load(f)["s3_keys"] + + expected_paths = [] + for s3_key in s3_keys: + hash_based_path = generate_hash_based_path(s3_key) + if hash_based_path: + expected_paths.append(str(hash_based_path)) + + expected_paths.sort() + download_files.sort() + assert expected_paths == download_files + else: + directory_record = expected_output_dir / "directory_structure.json" + with directory_record.open("r") as f: + directory_structure = json.load(f)["directory_structure"] + assert directory_structure == download_files def update_fixtures( diff --git a/unstructured_ingest/interfaces/downloader.py b/unstructured_ingest/interfaces/downloader.py index bf343b9fe..01f3c4787 100644 --- a/unstructured_ingest/interfaces/downloader.py +++ b/unstructured_ingest/interfaces/downloader.py @@ -8,7 +8,6 @@ from unstructured_ingest.data_types.file_data import FileData from unstructured_ingest.interfaces.connector import BaseConnector from unstructured_ingest.interfaces.process import BaseProcess -from unstructured_ingest.utils.filesystem import generate_hash_based_path class DownloaderConfig(BaseModel): @@ -42,7 +41,8 @@ def get_download_path(self, file_data: FileData) -> Optional[Path]: if not rel_path: return None - return generate_hash_based_path(rel_path, self.download_dir) + rel_path = rel_path[1:] if rel_path.startswith("/") else rel_path + return self.download_dir / Path(rel_path) @staticmethod def is_float(value: str): diff --git a/unstructured_ingest/processes/connectors/fsspec/fsspec.py b/unstructured_ingest/processes/connectors/fsspec/fsspec.py index ab6fded0e..a483d70b9 100644 --- a/unstructured_ingest/processes/connectors/fsspec/fsspec.py +++ b/unstructured_ingest/processes/connectors/fsspec/fsspec.py @@ -29,7 +29,7 @@ UploaderConfig, ) from unstructured_ingest.processes.connectors.fsspec.utils import sterilize_dict -from unstructured_ingest.utils.filesystem import mkdir_concurrent_safe +from unstructured_ingest.utils.filesystem import generate_hash_based_path, mkdir_concurrent_safe if TYPE_CHECKING: from fsspec import AbstractFileSystem @@ -270,6 +270,16 @@ class FsspecDownloader(Downloader): download_config: Optional[FsspecDownloaderConfigT] = field( default_factory=lambda: FsspecDownloaderConfig() ) + + def get_download_path(self, file_data: FileData) -> Optional[Path]: + if not file_data.source_identifiers: + return None + + rel_path = file_data.source_identifiers.relative_path + if not rel_path: + return None + + return generate_hash_based_path(rel_path, self.download_dir) def is_async(self) -> bool: with self.connection_config.get_client(protocol=self.protocol) as client: From 35736b9166565ce21ac4b5221e49611b8a8ebdde Mon Sep 17 00:00:00 2001 From: CyMule Date: Wed, 30 Jul 2025 16:33:08 -0400 Subject: [PATCH 06/10] temp files --- .../connectors/utils/validation/source.py | 17 +++++++------- .../processes/connectors/fsspec/fsspec.py | 12 ++++++---- unstructured_ingest/utils/filesystem.py | 23 ------------------- 3 files changed, 16 insertions(+), 36 deletions(-) diff --git a/test/integration/connectors/utils/validation/source.py b/test/integration/connectors/utils/validation/source.py index 2f9c97ab8..4529de379 100644 --- a/test/integration/connectors/utils/validation/source.py +++ b/test/integration/connectors/utils/validation/source.py @@ -10,7 +10,6 @@ from test.integration.connectors.utils.validation.utils import ValidationConfig from unstructured_ingest.data_types.file_data import FileData from unstructured_ingest.interfaces import Downloader, Indexer -from unstructured_ingest.utils.filesystem import generate_hash_based_path NONSTANDARD_METADATA_FIELDS = { "additional_metadata.@microsoft.graph.downloadUrl": [ @@ -174,15 +173,15 @@ def run_directory_structure_validation(expected_output_dir: Path, download_files with s3_keys_file.open("r") as f: s3_keys = json.load(f)["s3_keys"] - expected_paths = [] - for s3_key in s3_keys: - hash_based_path = generate_hash_based_path(s3_key) - if hash_based_path: - expected_paths.append(str(hash_based_path)) + expected_filenames = [Path(s3_key).name for s3_key in s3_keys] + actual_filenames = [Path(download_file).name for download_file in download_files] - expected_paths.sort() - download_files.sort() - assert expected_paths == download_files + expected_filenames.sort() + actual_filenames.sort() + assert expected_filenames == actual_filenames, ( + f"Expected filenames: {expected_filenames}, " + f"Got filenames: {actual_filenames}" + ) else: directory_record = expected_output_dir / "directory_structure.json" with directory_record.open("r") as f: diff --git a/unstructured_ingest/processes/connectors/fsspec/fsspec.py b/unstructured_ingest/processes/connectors/fsspec/fsspec.py index a483d70b9..4f2149f66 100644 --- a/unstructured_ingest/processes/connectors/fsspec/fsspec.py +++ b/unstructured_ingest/processes/connectors/fsspec/fsspec.py @@ -29,7 +29,7 @@ UploaderConfig, ) from unstructured_ingest.processes.connectors.fsspec.utils import sterilize_dict -from unstructured_ingest.utils.filesystem import generate_hash_based_path, mkdir_concurrent_safe +from unstructured_ingest.utils.filesystem import mkdir_concurrent_safe if TYPE_CHECKING: from fsspec import AbstractFileSystem @@ -275,11 +275,15 @@ def get_download_path(self, file_data: FileData) -> Optional[Path]: if not file_data.source_identifiers: return None - rel_path = file_data.source_identifiers.relative_path - if not rel_path: + filename = file_data.source_identifiers.filename + if not filename: return None - return generate_hash_based_path(rel_path, self.download_dir) + temp_dir = tempfile.mkdtemp( + prefix="unstructured_", + dir=self.download_dir + ) + return Path(temp_dir) / filename def is_async(self) -> bool: with self.connection_config.get_client(protocol=self.protocol) as client: diff --git a/unstructured_ingest/utils/filesystem.py b/unstructured_ingest/utils/filesystem.py index 861d4a450..c7933ced6 100644 --- a/unstructured_ingest/utils/filesystem.py +++ b/unstructured_ingest/utils/filesystem.py @@ -5,9 +5,7 @@ when multiple processes operate on the same directory structures simultaneously. """ -import hashlib from pathlib import Path -from typing import Optional def mkdir_concurrent_safe(path: Path) -> None: @@ -29,24 +27,3 @@ def mkdir_concurrent_safe(path: Path) -> None: raise -def generate_hash_based_path(s3_key: str, base_dir: Optional[Path] = None) -> Optional[Path]: - """ - Generate hash-based download path to prevent S3 path conflicts. - - Prevents conflicts when S3's flat namespace is mapped to hierarchical filesystems. - E.g., 'foo' (file) and 'foo/document' (needs foo as directory) would conflict. - """ - if not s3_key: - return None - - normalized_path = s3_key.lstrip("/") - if not normalized_path or not normalized_path.strip(): - return None - - filename = Path(normalized_path).name - if not filename: - return None - - dir_hash = hashlib.sha256(normalized_path.encode('utf-8')).hexdigest()[:12] - relative_path = Path(dir_hash) / filename - return base_dir / relative_path if base_dir else relative_path \ No newline at end of file From 966fddabf6303a76959bcd395a0f9d52e72db0f8 Mon Sep 17 00:00:00 2001 From: CyMule Date: Wed, 30 Jul 2025 16:47:04 -0400 Subject: [PATCH 07/10] fix --- unstructured_ingest/processes/connectors/fsspec/fsspec.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/unstructured_ingest/processes/connectors/fsspec/fsspec.py b/unstructured_ingest/processes/connectors/fsspec/fsspec.py index 4f2149f66..0004b0966 100644 --- a/unstructured_ingest/processes/connectors/fsspec/fsspec.py +++ b/unstructured_ingest/processes/connectors/fsspec/fsspec.py @@ -279,10 +279,7 @@ def get_download_path(self, file_data: FileData) -> Optional[Path]: if not filename: return None - temp_dir = tempfile.mkdtemp( - prefix="unstructured_", - dir=self.download_dir - ) + temp_dir = tempfile.mkdtemp(prefix="unstructured_") return Path(temp_dir) / filename def is_async(self) -> bool: From 7dd4231943a9e170e7ef95e7c2340d1624b04471 Mon Sep 17 00:00:00 2001 From: CyMule Date: Wed, 30 Jul 2025 16:56:48 -0400 Subject: [PATCH 08/10] mkdir and download dir --- unstructured_ingest/processes/connectors/fsspec/fsspec.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/unstructured_ingest/processes/connectors/fsspec/fsspec.py b/unstructured_ingest/processes/connectors/fsspec/fsspec.py index 0004b0966..6358f38c5 100644 --- a/unstructured_ingest/processes/connectors/fsspec/fsspec.py +++ b/unstructured_ingest/processes/connectors/fsspec/fsspec.py @@ -279,7 +279,12 @@ def get_download_path(self, file_data: FileData) -> Optional[Path]: if not filename: return None - temp_dir = tempfile.mkdtemp(prefix="unstructured_") + mkdir_concurrent_safe(self.download_dir) + + temp_dir = tempfile.mkdtemp( + prefix="unstructured_", + dir=self.download_dir + ) return Path(temp_dir) / filename def is_async(self) -> bool: From bc8e2ed484d359938edb93029422b41a171f985b Mon Sep 17 00:00:00 2001 From: CyMule Date: Wed, 30 Jul 2025 18:27:25 -0400 Subject: [PATCH 09/10] remove line --- unstructured_ingest/utils/filesystem.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/unstructured_ingest/utils/filesystem.py b/unstructured_ingest/utils/filesystem.py index c7933ced6..8cc97d96a 100644 --- a/unstructured_ingest/utils/filesystem.py +++ b/unstructured_ingest/utils/filesystem.py @@ -25,5 +25,3 @@ def mkdir_concurrent_safe(path: Path) -> None: except FileExistsError: if not (path.exists() and path.is_dir()): raise - - From ed2a8372629f4c5116a4af91c94d9070433593d0 Mon Sep 17 00:00:00 2001 From: CyMule Date: Thu, 31 Jul 2025 08:41:56 -0400 Subject: [PATCH 10/10] addres feedback --- .../integration/connectors/utils/validation/source.py | 10 ++++------ .../processes/connectors/fsspec/fsspec.py | 11 +++++++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/test/integration/connectors/utils/validation/source.py b/test/integration/connectors/utils/validation/source.py index 4529de379..e9db9bb04 100644 --- a/test/integration/connectors/utils/validation/source.py +++ b/test/integration/connectors/utils/validation/source.py @@ -173,14 +173,12 @@ def run_directory_structure_validation(expected_output_dir: Path, download_files with s3_keys_file.open("r") as f: s3_keys = json.load(f)["s3_keys"] - expected_filenames = [Path(s3_key).name for s3_key in s3_keys] - actual_filenames = [Path(download_file).name for download_file in download_files] + expected_filenames = {Path(s3_key).name for s3_key in s3_keys} + actual_filenames = {Path(download_file).name for download_file in download_files} - expected_filenames.sort() - actual_filenames.sort() assert expected_filenames == actual_filenames, ( - f"Expected filenames: {expected_filenames}, " - f"Got filenames: {actual_filenames}" + f"Expected filenames: {sorted(expected_filenames)}, " + f"Got filenames: {sorted(actual_filenames)}" ) else: directory_record = expected_output_dir / "directory_structure.json" diff --git a/unstructured_ingest/processes/connectors/fsspec/fsspec.py b/unstructured_ingest/processes/connectors/fsspec/fsspec.py index 6358f38c5..e6b3122c8 100644 --- a/unstructured_ingest/processes/connectors/fsspec/fsspec.py +++ b/unstructured_ingest/processes/connectors/fsspec/fsspec.py @@ -264,6 +264,8 @@ class FsspecDownloaderConfig(DownloaderConfig): @dataclass class FsspecDownloader(Downloader): + TEMP_DIR_PREFIX = "unstructured_" + protocol: str connection_config: FsspecConnectionConfigT connector_type: str = CONNECTOR_TYPE @@ -272,17 +274,18 @@ class FsspecDownloader(Downloader): ) def get_download_path(self, file_data: FileData) -> Optional[Path]: - if not file_data.source_identifiers: + has_source_identifiers = file_data.source_identifiers is not None + has_filename = has_source_identifiers and file_data.source_identifiers.filename + + if not (has_source_identifiers and has_filename): return None filename = file_data.source_identifiers.filename - if not filename: - return None mkdir_concurrent_safe(self.download_dir) temp_dir = tempfile.mkdtemp( - prefix="unstructured_", + prefix=self.TEMP_DIR_PREFIX, dir=self.download_dir ) return Path(temp_dir) / filename