Skip to content

Commit 6b07b4a

Browse files
authored
Fix FsspecFileIO.get_fs thread safety (#2495)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change `FsspecFileIO.get_fs` can be called by multiple threads when `ExecutorFactory` is used (for example by `DataScan.plan_files`). The base class of `fsspec` filesystem objects, `fsspec.spec.AbstractFileSystem`, internally caches instances through the `fsspec.spec._Cached` metaclass. The caching key used includes `threading.get_ident()`, making entries thread-local: https://github.com/fsspec/filesystem_spec/blob/f84b99f0d1f079f990db1a219b74df66ab3e7160/fsspec/spec.py#L71 The `FsspecFileIO.get_fs` LRU cache (around `FsspecFileIO._get_fs`) breaks the thread-locality of the filesystem instances as it will return the same instance for different threads. One consequence of this is that for `s3fs.S3FileSystem`, HTTP connection pooling no longer occurs per thread (as is normal with `aiobotocore`), as the `aiobotocore` client object (containing the `aiohttp.ClientSession`) is stored on the `s3fs.S3FileSystem`. This change addresses this by making the `FsspecFileIO.get_fs` cache thread-local. ## Are these changes tested? Tested locally. Unit test included covering the caching behaviour. ## Are there any user-facing changes? Yes - S3 HTTP connection pooling now occurs per-thread, matching the behaviour of `aiobotocore` when it used in the recommended way with an event loop per thread. <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 513295d commit 6b07b4a

File tree

2 files changed

+50
-3
lines changed

2 files changed

+50
-3
lines changed

pyiceberg/io/fsspec.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import json
2121
import logging
2222
import os
23+
import threading
2324
from copy import copy
2425
from functools import lru_cache, partial
2526
from typing import (
@@ -370,7 +371,7 @@ class FsspecFileIO(FileIO):
370371
def __init__(self, properties: Properties):
371372
self._scheme_to_fs = {}
372373
self._scheme_to_fs.update(SCHEME_TO_FS)
373-
self.get_fs: Callable[[str], AbstractFileSystem] = lru_cache(self._get_fs)
374+
self._thread_locals = threading.local()
374375
super().__init__(properties=properties)
375376

376377
def new_input(self, location: str) -> FsspecInputFile:
@@ -416,6 +417,13 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
416417
fs = self.get_fs(uri.scheme)
417418
fs.rm(str_location)
418419

420+
def get_fs(self, scheme: str) -> AbstractFileSystem:
421+
"""Get a filesystem for a specific scheme, cached per thread."""
422+
if not hasattr(self._thread_locals, "get_fs_cached"):
423+
self._thread_locals.get_fs_cached = lru_cache(self._get_fs)
424+
425+
return self._thread_locals.get_fs_cached(scheme)
426+
419427
def _get_fs(self, scheme: str) -> AbstractFileSystem:
420428
"""Get a filesystem for a specific scheme."""
421429
if scheme not in self._scheme_to_fs:
@@ -425,10 +433,10 @@ def _get_fs(self, scheme: str) -> AbstractFileSystem:
425433
def __getstate__(self) -> Dict[str, Any]:
426434
"""Create a dictionary of the FsSpecFileIO fields used when pickling."""
427435
fileio_copy = copy(self.__dict__)
428-
fileio_copy["get_fs"] = None
436+
del fileio_copy["_thread_locals"]
429437
return fileio_copy
430438

431439
def __setstate__(self, state: Dict[str, Any]) -> None:
432440
"""Deserialize the state into a FsSpecFileIO instance."""
433441
self.__dict__ = state
434-
self.get_fs = lru_cache(self._get_fs)
442+
self._thread_locals = threading.local()

tests/io/test_fsspec.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
import os
1919
import pickle
2020
import tempfile
21+
import threading
2122
import uuid
23+
from typing import List
2224
from unittest import mock
2325

2426
import pytest
2527
from botocore.awsrequest import AWSRequest
2628
from fsspec.implementations.local import LocalFileSystem
29+
from fsspec.spec import AbstractFileSystem
2730
from requests_mock import Mocker
2831

2932
from pyiceberg.exceptions import SignError
@@ -54,6 +57,42 @@ def test_fsspec_local_fs_can_create_path_without_parent_dir(fsspec_fileio: Fsspe
5457
pytest.fail("Failed to write to file without parent directory")
5558

5659

60+
def test_fsspec_get_fs_instance_per_thread_caching(fsspec_fileio: FsspecFileIO) -> None:
61+
"""Test that filesystem instances are cached per-thread by `FsspecFileIO.get_fs`"""
62+
fs_instances: List[AbstractFileSystem] = []
63+
start_work_events: List[threading.Event] = [threading.Event() for _ in range(2)]
64+
65+
def get_fs(start_work_event: threading.Event) -> None:
66+
# Wait to be told to actually start getting the filesystem instances
67+
start_work_event.wait()
68+
69+
# Call twice to ensure caching within the same thread
70+
for _ in range(2):
71+
fs_instances.append(fsspec_fileio.get_fs("file"))
72+
73+
threads = [threading.Thread(target=get_fs, args=[start_work_event]) for start_work_event in start_work_events]
74+
75+
# Start both threads (which will immediately block on their `Event`s) as we want to ensure distinct
76+
# `threading.get_ident()` values that are used in the `fsspec.spec.AbstractFileSystem`s cache keys..
77+
for thread in threads:
78+
thread.start()
79+
80+
# Get the filesystem instances in the first thread and wait for completion
81+
start_work_events[0].set()
82+
threads[0].join()
83+
84+
# Get the filesystem instances in the second thread and wait for completion
85+
start_work_events[1].set()
86+
threads[1].join()
87+
88+
# Same thread, same instance
89+
assert fs_instances[0] is fs_instances[1]
90+
assert fs_instances[2] is fs_instances[3]
91+
92+
# Different threads, different instances
93+
assert fs_instances[0] is not fs_instances[2]
94+
95+
5796
@pytest.mark.s3
5897
def test_fsspec_new_input_file(fsspec_fileio: FsspecFileIO) -> None:
5998
"""Test creating a new input file from a fsspec file-io"""

0 commit comments

Comments
 (0)