Skip to content

Commit 24af2a2

Browse files
committed
fix: fanoutcache
1 parent 9bd3269 commit 24af2a2

File tree

4 files changed

+49
-108
lines changed

4 files changed

+49
-108
lines changed

src/typed_diskcache/core/const.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
"""The time to sleep in the spin lock."""
3030
DEFAULT_LOCK_TIMEOUT = 10
3131
"""The default lock timeout."""
32+
DEFAULT_SIZE_LIMIT = 2**30 # 1gb
33+
"""The default size limit of the cache in bytes."""
3234

3335

3436
ENOVAL: Constant[Literal["ENOVAL"]] = Constant("ENOVAL")

src/typed_diskcache/implement/cache/fanout/main.py

Lines changed: 45 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
from pathlib import Path
99
from typing import TYPE_CHECKING, Any, NoReturn, overload
1010

11+
import anyio
1112
from sqlalchemy.exc import OperationalError
1213
from typing_extensions import TypeVar, Unpack, override
1314

1415
from typed_diskcache import exception as te
16+
from typed_diskcache.core.const import DEFAULT_SIZE_LIMIT
1517
from typed_diskcache.core.types import (
1618
Container,
1719
FilterMethod,
@@ -25,7 +27,6 @@
2527
from typed_diskcache.implement.cache import utils as cache_utils
2628
from typed_diskcache.implement.cache.default import Cache as Shard
2729
from typed_diskcache.implement.cache.fanout import utils as fanout_utils
28-
from typed_diskcache.implement.cache.utils import init_args
2930
from typed_diskcache.interface.cache import CacheProtocol
3031

3132
if TYPE_CHECKING:
@@ -66,15 +67,7 @@ class or callable. Default is `None`.
6667
[`Settings`][typed_diskcache.model.Settings].
6768
"""
6869

69-
__slots__ = (
70-
"_directory",
71-
"_disk",
72-
"_conn",
73-
"_settings",
74-
"_page_size",
75-
"_shard_size",
76-
"_shards",
77-
)
70+
__slots__ = ("_shards", "_cache")
7871

7972
def __init__(
8073
self,
@@ -91,31 +84,26 @@ def __init__(
9184
directory = directory.expanduser()
9285
directory = Path(expandvars(directory))
9386

94-
disk, conn, settings, page_size = init_args(
95-
directory, disk_type, disk_args, timeout, **kwargs
96-
)
97-
98-
settings = settings.model_copy(
99-
update={"size_limit": settings.size_limit // shard_size}
100-
)
87+
size_limit = kwargs.get("size_limit", DEFAULT_SIZE_LIMIT)
88+
kwargs["size_limit"] = size_limit // shard_size
10189

102-
self._shard_size = shard_size
103-
self._directory = directory
104-
self._disk = disk
105-
self._conn = conn
106-
self._settings = settings
107-
self._page_size = page_size
108-
self._shards: tuple[Shard, ...] = tuple(
109-
object.__new__(Shard) for _ in range(shard_size)
110-
)
111-
fanout_utils.update_shards_state(
112-
self._shards,
113-
self.directory,
114-
self.disk,
115-
self.conn,
116-
self.settings,
117-
self._page_size,
90+
self._cache = Shard(
91+
directory,
92+
disk_type=disk_type,
93+
disk_args=disk_args,
94+
timeout=timeout,
95+
**kwargs,
11896
)
97+
self._shards = tuple([
98+
Shard(
99+
directory / f"{index:03d}",
100+
disk_type=disk_type,
101+
disk_args=disk_args,
102+
timeout=timeout,
103+
**kwargs,
104+
)
105+
for index in range(shard_size)
106+
])
119107

120108
@override
121109
def __len__(self) -> int:
@@ -164,60 +152,44 @@ def __getstate__(self) -> Mapping[str, Any]:
164152
import cloudpickle
165153

166154
return {
167-
"shard_size": self._shard_size,
168-
"directory": str(self.directory),
169-
"disk": cloudpickle.dumps(self.disk),
170-
"conn": cloudpickle.dumps(self.conn),
171-
"settings": self.settings.model_dump_json(),
172-
"page_size": self._page_size,
155+
"shards": cloudpickle.dumps(self._shards),
156+
"cache": cloudpickle.dumps(self._cache),
173157
}
174158

175159
@override
176160
def __setstate__(self, state: Mapping[str, Any]) -> None:
177161
import cloudpickle
178162

179-
from typed_diskcache.model import Settings
180-
181-
self._shard_size = state["shard_size"]
182-
self._directory = Path(state["directory"])
183-
self._disk = cloudpickle.loads(state["disk"])
184-
self._conn = cloudpickle.loads(state["conn"])
185-
self._settings = Settings.model_validate_json(state["settings"])
186-
self._page_size = state["page_size"]
187-
self._shards = tuple(object.__new__(Shard) for _ in range(self._shard_size))
188-
fanout_utils.update_shards_state(
189-
self._shards,
190-
self._directory,
191-
self._disk,
192-
self.conn,
193-
self.settings,
194-
self._page_size,
195-
)
163+
cache: Shard = cloudpickle.loads(state["cache"])
164+
shards: tuple[Shard] = cloudpickle.loads(state["shards"])
165+
166+
self._cache = cache
167+
self._shards = shards
196168

197169
@property
198170
@override
199171
def directory(self) -> Path:
200-
return self._directory
172+
return self._cache.directory
201173

202174
@property
203175
@override
204176
def timeout(self) -> float:
205-
return self._conn.timeout
177+
return self._cache.timeout
206178

207179
@property
208180
@override
209181
def conn(self) -> Connection:
210-
return self._conn
182+
return self._cache.conn
211183

212184
@property
213185
@override
214186
def disk(self) -> DiskProtocol:
215-
return self._disk
187+
return self._cache.disk
216188

217189
@property
218190
@override
219191
def settings(self) -> Settings:
220-
return self._settings
192+
return self._cache.settings
221193

222194
@settings.setter
223195
def settings(self, value: Settings) -> None:
@@ -371,10 +343,15 @@ async def astats(self, *, enable: bool = True, reset: bool = False) -> Stats:
371343
@override
372344
def close(self) -> None:
373345
self.conn.close()
346+
for shard in self._shards:
347+
shard.close()
374348

375349
@override
376350
async def aclose(self) -> None:
377351
await self.conn.aclose()
352+
async with anyio.create_task_group() as task_group:
353+
for shard in self._shards:
354+
task_group.start_soon(shard.aclose)
378355

379356
@override
380357
def touch(
@@ -709,18 +686,13 @@ async def apeekitem(self, *, last: bool = True, retry: bool = False) -> NoReturn
709686

710687
@override
711688
def update_settings(self, settings: Settings) -> None:
712-
first_shard = next(iter(self._shards))
713-
first_shard.update_settings(settings)
714-
settings = first_shard.settings
715-
for shard in self._shards[1:]:
716-
shard._settings = settings # noqa: SLF001
717-
self._settings = settings
689+
for shard in self._shards:
690+
shard.update_settings(settings)
691+
self._cache.update_settings(settings)
718692

719693
@override
720694
async def aupdate_settings(self, settings: Settings) -> None:
721-
first_shard = next(iter(self._shards))
722-
await first_shard.aupdate_settings(settings)
723-
settings = first_shard.settings
724-
for shard in self._shards[1:]:
725-
shard._settings = settings # noqa: SLF001
726-
self._settings = settings
695+
async with anyio.create_task_group() as task_group:
696+
for shard in self._shards:
697+
task_group.start_soon(shard.aupdate_settings, settings)
698+
task_group.start_soon(self._cache.aupdate_settings, settings)

src/typed_diskcache/implement/cache/fanout/utils.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
from copy import copy
43
from typing import TYPE_CHECKING, Any
54

65
from typing_extensions import ParamSpec, TypeAlias, TypeVar
@@ -11,13 +10,10 @@
1110
if TYPE_CHECKING:
1211
from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable
1312
from os import PathLike
14-
from pathlib import Path
1513

16-
from typed_diskcache.database import Connection
1714
from typed_diskcache.implement.cache.default import Cache
1815
from typed_diskcache.interface.cache import CacheProtocol
1916
from typed_diskcache.interface.disk import DiskProtocol
20-
from typed_diskcache.model import Settings
2117

2218
__all__ = []
2319

@@ -82,33 +78,3 @@ async def async_loop_total(
8278
while flag:
8379
total, flag = await async_loop_count(total, func, *args, **kwargs)
8480
return total
85-
86-
87-
def update_shards_state( # noqa: PLR0913
88-
shards: tuple[Cache, ...],
89-
directory: Path,
90-
disk: DiskProtocol,
91-
conn: Connection,
92-
settings: Settings,
93-
page_size: int,
94-
) -> None:
95-
for index, shard in enumerate(shards):
96-
update_shard_state(index, shard, directory, disk, conn, settings, page_size)
97-
98-
99-
def update_shard_state( # noqa: PLR0913
100-
index: int,
101-
shard: Cache,
102-
directory: Path,
103-
disk: DiskProtocol,
104-
conn: Connection,
105-
settings: Settings,
106-
page_size: int,
107-
) -> None:
108-
shard._directory = directory / f"{index:03d}" # noqa: SLF001
109-
shard._directory.mkdir(parents=True, exist_ok=True) # noqa: SLF001
110-
shard._disk = copy(disk) # noqa: SLF001
111-
shard._disk.directory = shard.directory # noqa: SLF001
112-
shard._conn = conn # noqa: SLF001
113-
shard._settings = settings # noqa: SLF001
114-
shard._page_size = page_size # noqa: SLF001

src/typed_diskcache/model.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, Json, JsonValue
77

8+
from typed_diskcache.core.const import DEFAULT_SIZE_LIMIT
89
from typed_diskcache.core.types import (
910
EvictionPolicy,
1011
SQLiteAutoVacuum,
@@ -58,7 +59,7 @@ class Settings(BaseModel):
5859

5960
statistics: bool = False
6061
eviction_policy: EvictionPolicy = EvictionPolicy.LEAST_RECENTLY_STORED
61-
size_limit: int = 2**30
62+
size_limit: int = DEFAULT_SIZE_LIMIT
6263
cull_limit: int = 10
6364
serialized_disk: Annotated[
6465
tuple[str, dict[str, JsonValue]] | Json[tuple[str, dict[str, JsonValue]]],

0 commit comments

Comments
 (0)