Skip to content

Commit 5ada081

Browse files
authored
refactor: Give more instance caching flexibility to the storage clients (#1425)
### Description - `StorageClient` type is not an individual storage key and is instead used in an additional cache key (can be overridden). - This allows `StorageClients` to have better control over caching. ### Issues - Enables: apify/apify-sdk-python#513 ### Testing - Unit tests
1 parent 403ef00 commit 5ada081

File tree

6 files changed

+56
-79
lines changed

6 files changed

+56
-79
lines changed

src/crawlee/storage_clients/_base/_storage_client.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ class StorageClient(ABC):
3030
(where applicable), and consistent access patterns across all storage types it supports.
3131
"""
3232

33-
def get_additional_cache_key(self, configuration: Configuration) -> Hashable: # noqa: ARG002
34-
"""Return a cache key that can differentiate between different storages of this client.
33+
def get_storage_client_cache_key(self, configuration: Configuration) -> Hashable: # noqa: ARG002
34+
"""Return a cache key that can differentiate between different storages of this and other clients.
3535
36-
Can be based on configuration or on the client itself. By default, returns an empty string.
36+
Can be based on configuration or on the client itself. By default, returns a module and name of the client
37+
class.
3738
"""
38-
return ''
39+
return f'{self.__class__.__module__}.{self.__class__.__name__}'
3940

4041
@abstractmethod
4142
async def create_dataset_client(

src/crawlee/storage_clients/_file_system/_storage_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ class FileSystemStorageClient(StorageClient):
3535
"""
3636

3737
@override
38-
def get_additional_cache_key(self, configuration: Configuration) -> Hashable:
38+
def get_storage_client_cache_key(self, configuration: Configuration) -> Hashable:
3939
# Even different client instances should return same storage if the storage_dir is the same.
40-
return configuration.storage_dir
40+
return super().get_storage_client_cache_key(configuration), configuration.storage_dir
4141

4242
@override
4343
async def create_dataset_client(

src/crawlee/storages/_dataset.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,15 @@ async def open(
110110
client_opener_coro = storage_client.create_dataset_client(
111111
id=id, name=name, alias=alias, configuration=configuration
112112
)
113-
additional_cache_key = storage_client.get_additional_cache_key(configuration=configuration)
113+
storage_client_cache_key = storage_client.get_storage_client_cache_key(configuration=configuration)
114114

115115
return await service_locator.storage_instance_manager.open_storage_instance(
116116
cls,
117117
id=id,
118118
name=name,
119119
alias=alias,
120120
client_opener_coro=client_opener_coro,
121-
storage_client_type=storage_client.__class__,
122-
additional_cache_key=additional_cache_key,
121+
storage_client_cache_key=storage_client_cache_key,
123122
)
124123

125124
@override

src/crawlee/storages/_key_value_store.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,16 +122,15 @@ async def open(
122122
client_opener_coro = storage_client.create_kvs_client(
123123
id=id, name=name, alias=alias, configuration=configuration
124124
)
125-
additional_cache_key = storage_client.get_additional_cache_key(configuration=configuration)
125+
additional_cache_key = storage_client.get_storage_client_cache_key(configuration=configuration)
126126

127127
return await service_locator.storage_instance_manager.open_storage_instance(
128128
cls,
129129
id=id,
130130
name=name,
131-
client_opener_coro=client_opener_coro,
132131
alias=alias,
133-
storage_client_type=storage_client.__class__,
134-
additional_cache_key=additional_cache_key,
132+
client_opener_coro=client_opener_coro,
133+
storage_client_cache_key=additional_cache_key,
135134
)
136135

137136
@override

src/crawlee/storages/_request_queue.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,15 @@ async def open(
126126
storage_client = service_locator.get_storage_client() if storage_client is None else storage_client
127127

128128
client_opener_coro = storage_client.create_rq_client(id=id, name=name, alias=alias, configuration=configuration)
129-
additional_cache_key = storage_client.get_additional_cache_key(configuration=configuration)
129+
additional_cache_key = storage_client.get_storage_client_cache_key(configuration=configuration)
130130

131131
return await service_locator.storage_instance_manager.open_storage_instance(
132132
cls,
133133
id=id,
134134
name=name,
135135
alias=alias,
136136
client_opener_coro=client_opener_coro,
137-
storage_client_type=storage_client.__class__,
138-
additional_cache_key=additional_cache_key,
137+
storage_client_cache_key=additional_cache_key,
139138
)
140139

141140
@override

src/crawlee/storages/_storage_instance_manager.py

Lines changed: 42 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,56 @@
88
from crawlee.storage_clients._base import DatasetClient, KeyValueStoreClient, RequestQueueClient
99

1010
if TYPE_CHECKING:
11-
from crawlee.storage_clients import StorageClient
12-
1311
from ._base import Storage
1412

1513
T = TypeVar('T', bound='Storage')
1614

1715

1816
@dataclass
19-
class _StorageClientCache:
20-
"""Cache for specific storage client.
21-
22-
Example:
23-
Storage=Dataset, id='123', additional_cache_key="some_path" will be located in
24-
storage = by_id[Dataset]['123'][some_path]
25-
"""
17+
class _StorageCache:
18+
"""Cache for storage instances."""
2619

2720
by_id: defaultdict[type[Storage], defaultdict[str, defaultdict[Hashable, Storage]]] = field(
2821
default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict()))
2922
)
30-
"""Cache for storage instances by ID, separated by storage type and additional hash key."""
23+
"""Cache for storage instances by ID. Example: by_id[Dataset]['some_id']['some_additional_cache_key']."""
3124

3225
by_name: defaultdict[type[Storage], defaultdict[str, defaultdict[Hashable, Storage]]] = field(
3326
default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict()))
3427
)
35-
"""Cache for storage instances by name, separated by storage type and additional hash key."""
28+
"""Cache for storage instances by name. Example: by_name[Dataset]['some_name']['some_additional_cache_key']"""
3629

3730
by_alias: defaultdict[type[Storage], defaultdict[str, defaultdict[Hashable, Storage]]] = field(
3831
default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict()))
3932
)
40-
"""Cache for storage instances by alias, separated by storage type and additional hash key."""
33+
"""Cache for storage instances by alias. Example: by_alias[Dataset]['some_alias']['some_additional_cache_key']"""
34+
35+
def remove_from_cache(self, storage_instance: Storage) -> None:
36+
"""Remove a storage instance from the cache.
37+
38+
Args:
39+
storage_instance: The storage instance to remove.
40+
"""
41+
storage_type = type(storage_instance)
42+
43+
# Remove from ID cache
44+
for additional_key in self.by_id[storage_type][storage_instance.id]:
45+
del self.by_id[storage_type][storage_instance.id][additional_key]
46+
break
4147

48+
# Remove from name cache or alias cache. It can never be in both.
49+
if storage_instance.name is not None:
50+
for additional_key in self.by_name[storage_type][storage_instance.name]:
51+
del self.by_name[storage_type][storage_instance.name][additional_key]
52+
break
53+
else:
54+
for alias_key in self.by_alias[storage_type]:
55+
for additional_key in self.by_alias[storage_type][alias_key]:
56+
del self.by_alias[storage_type][alias_key][additional_key]
57+
break
4258

43-
StorageClientType = DatasetClient | KeyValueStoreClient | RequestQueueClient
44-
"""Type alias for the storage client types."""
4559

46-
ClientOpenerCoro = Coroutine[None, None, StorageClientType]
60+
ClientOpenerCoro = Coroutine[None, None, DatasetClient | KeyValueStoreClient | RequestQueueClient]
4761
"""Type alias for the client opener function."""
4862

4963

@@ -58,7 +72,7 @@ class StorageInstanceManager:
5872
"""Reserved alias for default unnamed storage."""
5973

6074
def __init__(self) -> None:
61-
self._cache_by_storage_client: dict[type[StorageClient], _StorageClientCache] = defaultdict(_StorageClientCache)
75+
self._cache: _StorageCache = _StorageCache()
6276

6377
async def open_storage_instance(
6478
self,
@@ -67,9 +81,8 @@ async def open_storage_instance(
6781
id: str | None,
6882
name: str | None,
6983
alias: str | None,
70-
storage_client_type: type[StorageClient],
7184
client_opener_coro: ClientOpenerCoro,
72-
additional_cache_key: Hashable = '',
85+
storage_client_cache_key: Hashable = '',
7386
) -> T:
7487
"""Open a storage instance with caching support.
7588
@@ -78,9 +91,8 @@ async def open_storage_instance(
7891
id: Storage ID.
7992
name: Storage name. (global scope, persists across runs).
8093
alias: Storage alias (run scope, creates unnamed storage).
81-
storage_client_type: Type of storage client to use.
8294
client_opener_coro: Coroutine to open the storage client when storage instance not found in cache.
83-
additional_cache_key: Additional optional key to differentiate cache entries.
95+
storage_client_cache_key: Additional optional key from storage client to differentiate cache entries.
8496
8597
Returns:
8698
The storage instance.
@@ -105,45 +117,31 @@ async def open_storage_instance(
105117
alias = self._DEFAULT_STORAGE_ALIAS
106118

107119
# Check cache
108-
if id is not None and (
109-
cached_instance := self._cache_by_storage_client[storage_client_type]
110-
.by_id[cls][id]
111-
.get(additional_cache_key)
112-
):
120+
if id is not None and (cached_instance := self._cache.by_id[cls][id].get(storage_client_cache_key)):
113121
if isinstance(cached_instance, cls):
114122
return cached_instance
115123
raise RuntimeError('Cached instance type mismatch.')
116124

117-
if name is not None and (
118-
cached_instance := self._cache_by_storage_client[storage_client_type]
119-
.by_name[cls][name]
120-
.get(additional_cache_key)
121-
):
125+
if name is not None and (cached_instance := self._cache.by_name[cls][name].get(storage_client_cache_key)):
122126
if isinstance(cached_instance, cls):
123127
return cached_instance
124128
raise RuntimeError('Cached instance type mismatch.')
125129

126130
if alias is not None and (
127-
cached_instance := self._cache_by_storage_client[storage_client_type]
128-
.by_alias[cls][alias]
129-
.get(additional_cache_key)
131+
cached_instance := self._cache.by_alias[cls][alias].get(storage_client_cache_key)
130132
):
131133
if isinstance(cached_instance, cls):
132134
return cached_instance
133135
raise RuntimeError('Cached instance type mismatch.')
134136

135137
# Check for conflicts between named and alias storages
136-
if alias and (
137-
self._cache_by_storage_client[storage_client_type].by_name[cls][alias].get(additional_cache_key)
138-
):
138+
if alias and (self._cache.by_name[cls][alias].get(storage_client_cache_key)):
139139
raise ValueError(
140140
f'Cannot create alias storage "{alias}" because a named storage with the same name already exists. '
141141
f'Use a different alias or drop the existing named storage first.'
142142
)
143143

144-
if name and (
145-
self._cache_by_storage_client[storage_client_type].by_alias[cls][name].get(additional_cache_key)
146-
):
144+
if name and (self._cache.by_alias[cls][name].get(storage_client_cache_key)):
147145
raise ValueError(
148146
f'Cannot create named storage "{name}" because an alias storage with the same name already exists. '
149147
f'Use a different name or drop the existing alias storage first.'
@@ -160,17 +158,15 @@ async def open_storage_instance(
160158

161159
# Cache the instance.
162160
# Always cache by id.
163-
self._cache_by_storage_client[storage_client_type].by_id[cls][instance.id][additional_cache_key] = instance
161+
self._cache.by_id[cls][instance.id][storage_client_cache_key] = instance
164162

165163
# Cache named storage.
166164
if instance_name is not None:
167-
self._cache_by_storage_client[storage_client_type].by_name[cls][instance_name][additional_cache_key] = (
168-
instance
169-
)
165+
self._cache.by_name[cls][instance_name][storage_client_cache_key] = instance
170166

171167
# Cache unnamed storage.
172168
if alias is not None:
173-
self._cache_by_storage_client[storage_client_type].by_alias[cls][alias][additional_cache_key] = instance
169+
self._cache.by_alias[cls][alias][storage_client_cache_key] = instance
174170

175171
return instance
176172

@@ -185,25 +181,8 @@ def remove_from_cache(self, storage_instance: Storage) -> None:
185181
Args:
186182
storage_instance: The storage instance to remove.
187183
"""
188-
storage_type = type(storage_instance)
189-
190-
for storage_client_cache in self._cache_by_storage_client.values():
191-
# Remove from ID cache
192-
for additional_key in storage_client_cache.by_id[storage_type][storage_instance.id]:
193-
del storage_client_cache.by_id[storage_type][storage_instance.id][additional_key]
194-
break
195-
196-
# Remove from name cache or alias cache. It can never be in both.
197-
if storage_instance.name is not None:
198-
for additional_key in storage_client_cache.by_name[storage_type][storage_instance.name]:
199-
del storage_client_cache.by_name[storage_type][storage_instance.name][additional_key]
200-
break
201-
else:
202-
for alias_key in storage_client_cache.by_alias[storage_type]:
203-
for additional_key in storage_client_cache.by_alias[storage_type][alias_key]:
204-
del storage_client_cache.by_alias[storage_type][alias_key][additional_key]
205-
break
184+
self._cache.remove_from_cache(storage_instance)
206185

207186
def clear_cache(self) -> None:
208187
"""Clear all cached storage instances."""
209-
self._cache_by_storage_client = defaultdict(_StorageClientCache)
188+
self._cache = _StorageCache()

0 commit comments

Comments
 (0)