Skip to content

Commit 672950b

Browse files
Remove cachetools dependency (#5352)
Cachetools 7 introduced several breaking changes that made @cachedmethod significantly stricter, including converting it into a descriptor and requiring instances to have a mutable __dict__. These assumptions do not align well with Kedro’s ParallelRunner, which relies on pickling and unpickling dataset instances across processes, and this resulted in runtime failures when upgrading to cachetools 7. Although pinning the dependency avoided the immediate issue, further investigation showed that cachetools is not well-suited to this use case. Kedro only caches two values per dataset instance (the resolved load and save versions) and does not require eviction policies, TTLs, or multi-key caching. As a long-term solution, this change removes the cachetools dependency and replaces it with lightweight manual caching. This simplifies the implementation, removes an unnecessary dependency, and avoids future breakages in parallel execution.
1 parent 37ec20e commit 672950b

File tree

5 files changed

+55
-23
lines changed

5 files changed

+55
-23
lines changed

.github/workflows/e2e-tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ jobs:
3333
activate-environment: true
3434
- name: Install dependencies
3535
run: |
36+
# TEMP: For passing tests, will be removed before merge
37+
uv pip install "git+https://github.com/kedro-org/kedro-plugins.git@fix/remove-cachetools-deps#subdirectory=kedro-datasets"
3638
make install-test-requirements
3739
uv pip install pip
3840
make install-pre-commit

.github/workflows/unit-tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ jobs:
3030
activate-environment: true
3131
- name: Install dependencies
3232
run: |
33+
# TEMP: For passing tests, will be removed before merge
34+
uv pip install "git+https://github.com/kedro-org/kedro-plugins.git@fix/remove-cachetools-deps#subdirectory=kedro-datasets"
3335
make install-test-requirements
3436
make install-pre-commit
3537
- name: pip freeze

kedro/io/core.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@
1212
import warnings
1313
from collections import namedtuple
1414
from datetime import datetime, timezone
15-
from functools import partial, wraps
15+
from functools import wraps
1616
from glob import iglob
1717
from inspect import getcallargs
18-
from operator import attrgetter
1918
from pathlib import Path, PurePath, PurePosixPath
2019
from typing import ( # noqa: UP035
2120
TYPE_CHECKING,
@@ -30,8 +29,6 @@
3029
runtime_checkable,
3130
)
3231

33-
from cachetools import Cache, cachedmethod
34-
from cachetools.keys import hashkey
3532
from typing_extensions import Self
3633

3734
# These are re-exported for backward compatibility
@@ -734,13 +731,23 @@ def __init__(
734731
self._version = version
735732
self._exists_function = exists_function or _local_exists
736733
self._glob_function = glob_function or iglob
737-
# 1 entry for load version, 1 for save version
738-
self._version_cache = Cache(maxsize=2) # type: Cache
734+
self._cached_load_version: str | None = None
735+
self._cached_save_version: str | None = None
736+
737+
def _clear_version_cache(self) -> None:
738+
"""Clear both load and save version caches."""
739+
self._cached_load_version = None
740+
self._cached_save_version = None
739741

740-
# 'key' is set to prevent cache key overlapping for load and save:
741-
# https://cachetools.readthedocs.io/en/stable/#cachetools.cachedmethod
742-
@cachedmethod(cache=attrgetter("_version_cache"), key=partial(hashkey, "load"))
743742
def _fetch_latest_load_version(self) -> str:
743+
"""Fetch the most recent existing version from the given path.
744+
Results are cached to avoid repeated filesystem operations.
745+
"""
746+
747+
# Return cached version if available
748+
if self._cached_load_version is not None:
749+
return self._cached_load_version
750+
744751
# When load version is unpinned, fetch the most recent existing
745752
# version from the given path.
746753
pattern = str(self._get_versioned_path("*"))
@@ -758,14 +765,20 @@ def _fetch_latest_load_version(self) -> str:
758765
if not most_recent:
759766
message = f"Did not find any versions for {self}"
760767
raise VersionNotFoundError(message)
761-
return PurePath(most_recent).parent.name
762768

763-
# 'key' is set to prevent cache key overlapping for load and save:
764-
# https://cachetools.readthedocs.io/en/stable/#cachetools.cachedmethod
765-
@cachedmethod(cache=attrgetter("_version_cache"), key=partial(hashkey, "save"))
769+
# Cache and return the result
770+
self._cached_load_version = PurePath(most_recent).parent.name
771+
return self._cached_load_version
772+
766773
def _fetch_latest_save_version(self) -> str:
767774
"""Generate and cache the current save version"""
768-
return generate_timestamp()
775+
# Return cached version if available
776+
if self._cached_save_version is not None:
777+
return self._cached_save_version
778+
779+
# Generate new timestamp and cache it
780+
self._cached_save_version = generate_timestamp()
781+
return self._cached_save_version
769782

770783
def resolve_load_version(self) -> str | None:
771784
"""Compute the version the dataset should be loaded with."""
@@ -818,7 +831,7 @@ def _save_wrapper(
818831

819832
@wraps(save_func)
820833
def save(self: Self, data: _DI) -> None:
821-
self._version_cache.clear()
834+
self._clear_version_cache()
822835
save_version = (
823836
self.resolve_save_version()
824837
) # Make sure last save version is set
@@ -844,7 +857,7 @@ def save(self: Self, data: _DI) -> None:
844857
warnings.warn(
845858
_CONSISTENCY_WARNING.format(save_version, load_version, str(self))
846859
)
847-
self._version_cache.clear()
860+
self._clear_version_cache()
848861

849862
return save
850863

@@ -870,7 +883,7 @@ def exists(self) -> bool:
870883

871884
def _release(self) -> None:
872885
super()._release()
873-
self._version_cache.clear()
886+
self._clear_version_cache()
874887

875888

876889
def get_protocol_and_path(

pyproject.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ requires-python = ">=3.10"
1414
dependencies = [
1515
"attrs>=21.3",
1616
"build>=0.7.0",
17-
"cachetools>=4.1,<7.0", # Pinned because cachetools 7 breaks Kedro multiprocessing
1817
"click>=8.2",
1918
"cookiecutter>=2.1.1,<3.0",
2019
"dynaconf>=3.1.2,<4.0",
@@ -73,7 +72,6 @@ test = [
7372
# mypy related dependencies
7473
"pandas-stubs",
7574
"types-PyYAML",
76-
"types-cachetools",
7775
"types-requests",
7876
]
7977
docs = [

tests/io/test_core.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -598,11 +598,28 @@ def test_versioning_existing_dataset(
598598
assert my_versioned_dataset.exists()
599599

600600
def test_cache_release(self, my_versioned_dataset):
601-
my_versioned_dataset._version_cache["index"] = "value"
602-
assert my_versioned_dataset._version_cache.currsize > 0
601+
# Set cache values
602+
my_versioned_dataset._cached_load_version = "2024-01-01T00.00.00.000Z"
603+
my_versioned_dataset._cached_save_version = "2024-01-02T00.00.00.000Z"
604+
assert my_versioned_dataset._cached_load_version is not None
605+
assert my_versioned_dataset._cached_save_version is not None
603606

607+
# Release should clear the cache
604608
my_versioned_dataset._release()
605-
assert my_versioned_dataset._version_cache.currsize == 0
609+
assert my_versioned_dataset._cached_load_version is None
610+
assert my_versioned_dataset._cached_save_version is None
611+
612+
def test_clear_version_cache(self, my_versioned_dataset):
613+
# Set cache values
614+
my_versioned_dataset._cached_load_version = "2024-01-01T00.00.00.000Z"
615+
my_versioned_dataset._cached_save_version = "2024-01-02T00.00.00.000Z"
616+
assert my_versioned_dataset._cached_load_version is not None
617+
assert my_versioned_dataset._cached_save_version is not None
618+
619+
# Clear cache should reset both to None
620+
my_versioned_dataset._clear_version_cache()
621+
assert my_versioned_dataset._cached_load_version is None
622+
assert my_versioned_dataset._cached_save_version is None
606623

607624
def test_fetch_latest_load_version_success(self, my_versioned_dataset, mocker):
608625
mock_get_versioned_path = mocker.patch(
@@ -622,7 +639,7 @@ def test_fetch_latest_load_version_success(self, my_versioned_dataset, mocker):
622639
)
623640

624641
# Ensure the cache is empty for the test
625-
my_versioned_dataset._version_cache = {}
642+
my_versioned_dataset._cached_load_version = None
626643
result = my_versioned_dataset._fetch_latest_load_version()
627644

628645
mock_get_versioned_path.assert_called_once_with("*")

0 commit comments

Comments
 (0)