Skip to content

Commit 70c03d9

Browse files
authored
fix: resolve mkdir race condition in concurrent operations (#551)
1 parent 7e52201 commit 70c03d9

File tree

6 files changed

+42
-4
lines changed

6 files changed

+42
-4
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.0.52
2+
3+
* **Fix mkdir race condition in concurrent operations**
4+
15
## 1.0.51
26

37
* **Fix SharePoint connector UnboundLocalError when site not found**

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.0.51" # pragma: no cover
1+
__version__ = "1.0.52" # pragma: no cover

unstructured_ingest/processes/connectors/fsspec/fsspec.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
)
3131
from unstructured_ingest.logger import logger
3232
from unstructured_ingest.processes.connectors.fsspec.utils import sterilize_dict
33+
from unstructured_ingest.utils.filesystem import mkdir_concurrent_safe
3334

3435
if TYPE_CHECKING:
3536
from fsspec import AbstractFileSystem
@@ -270,7 +271,7 @@ def wrap_error(self, e: Exception) -> Exception:
270271

271272
def run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
272273
download_path = self.get_download_path(file_data=file_data)
273-
download_path.parent.mkdir(parents=True, exist_ok=True)
274+
mkdir_concurrent_safe(download_path.parent)
274275
try:
275276
rpath = file_data.additional_metadata["original_file_path"]
276277
with self.connection_config.get_client(protocol=self.protocol) as client:
@@ -282,7 +283,7 @@ def run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
282283

283284
async def async_run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
284285
download_path = self.get_download_path(file_data=file_data)
285-
download_path.parent.mkdir(parents=True, exist_ok=True)
286+
mkdir_concurrent_safe(download_path.parent)
286287
try:
287288
rpath = file_data.additional_metadata["original_file_path"]
288289
with self.connection_config.get_client(protocol=self.protocol) as client:
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Utility functions for unstructured-ingest."""
2+
3+
from unstructured_ingest.utils.filesystem import mkdir_concurrent_safe
4+
5+
__all__ = ["mkdir_concurrent_safe"]

unstructured_ingest/utils/compression.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Optional
77

88
from unstructured_ingest.logger import logger
9+
from unstructured_ingest.utils.filesystem import mkdir_concurrent_safe
910

1011
ZIP_FILE_EXT = [".zip"]
1112
TAR_FILE_EXT = [".tar", ".tar.gz", ".tgz"]
@@ -17,7 +18,7 @@ def uncompress_file(filename: str, path: Optional[str] = None) -> str:
1718
"""
1819
# Create path if it doesn't already exist
1920
if path:
20-
Path(path).mkdir(parents=True, exist_ok=True)
21+
mkdir_concurrent_safe(Path(path))
2122

2223
if any(filename.endswith(ext) for ext in ZIP_FILE_EXT):
2324
return uncompress_zip_file(zip_filename=filename, path=path)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""
2+
Filesystem utilities for concurrent operations.
3+
4+
This module provides race-condition-safe filesystem operations that are needed
5+
when multiple processes operate on the same directory structures simultaneously.
6+
"""
7+
8+
from pathlib import Path
9+
10+
11+
def mkdir_concurrent_safe(path: Path) -> None:
12+
"""
13+
Create directory safely in concurrent environments, handling race conditions.
14+
15+
This addresses the issue where Path.mkdir(parents=True, exist_ok=True) can still
16+
raise FileExistsError when multiple processes attempt to create overlapping
17+
directory structures simultaneously. In this codebase, this occurs when multiple
18+
files are being downloaded in parallel and archive extraction is happening in parallel.
19+
20+
Related: https://github.com/python/cpython/pull/112966/files
21+
Python core team used the same approach to fix zipfile race conditions.
22+
"""
23+
try:
24+
path.mkdir(parents=True, exist_ok=True)
25+
except FileExistsError:
26+
if not (path.exists() and path.is_dir()):
27+
raise

0 commit comments

Comments
 (0)