Skip to content

Commit aa22464

Browse files
committed
Caching of Dataset and KVS
1 parent 86380e4 commit aa22464

File tree

10 files changed

+120
-125
lines changed

10 files changed

+120
-125
lines changed

src/crawlee/storage_clients/_file_system/_dataset_client.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import datetime, timezone
77
from logging import getLogger
88
from pathlib import Path
9-
from typing import TYPE_CHECKING
9+
from typing import TYPE_CHECKING, ClassVar
1010

1111
from pydantic import ValidationError
1212
from typing_extensions import override
@@ -23,9 +23,6 @@
2323

2424
logger = getLogger(__name__)
2525

26-
_cache_by_name = dict[str, 'FileSystemDatasetClient']()
27-
"""A dictionary to cache clients by their names."""
28-
2926

3027
class FileSystemDatasetClient(DatasetClient):
3128
"""A file system implementation of the dataset client.
@@ -44,6 +41,9 @@ class FileSystemDatasetClient(DatasetClient):
4441
_LOCAL_ENTRY_NAME_DIGITS = 9
4542
"""Number of digits used for the file names (e.g., 000000019.json)."""
4643

44+
_cache_by_name: ClassVar[dict[str, FileSystemDatasetClient]] = {}
45+
"""A dictionary to cache clients by their names."""
46+
4747
def __init__(
4848
self,
4949
*,
@@ -131,8 +131,8 @@ async def open(
131131
name = name or cls._DEFAULT_NAME
132132

133133
# Check if the client is already cached by name.
134-
if name in _cache_by_name:
135-
client = _cache_by_name[name]
134+
if name in cls._cache_by_name:
135+
client = cls._cache_by_name[name]
136136
await client._update_metadata(update_accessed_at=True) # noqa: SLF001
137137
return client
138138

@@ -182,7 +182,7 @@ async def open(
182182
await client._update_metadata()
183183

184184
# Cache the client by name.
185-
_cache_by_name[name] = client
185+
cls._cache_by_name[name] = client
186186

187187
return client
188188

@@ -194,8 +194,8 @@ async def drop(self) -> None:
194194
await asyncio.to_thread(shutil.rmtree, self.path_to_dataset)
195195

196196
# Remove the client from the cache.
197-
if self.name in _cache_by_name:
198-
del _cache_by_name[self.name]
197+
if self.name in self.__class__._cache_by_name: # noqa: SLF001
198+
del self.__class__._cache_by_name[self.name] # noqa: SLF001
199199

200200
@override
201201
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:

src/crawlee/storage_clients/_file_system/_key_value_store_client.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import datetime, timezone
77
from logging import getLogger
88
from pathlib import Path
9-
from typing import TYPE_CHECKING, Any
9+
from typing import TYPE_CHECKING, Any, ClassVar
1010

1111
from pydantic import ValidationError
1212
from typing_extensions import override
@@ -24,9 +24,6 @@
2424

2525
logger = getLogger(__name__)
2626

27-
_cache_by_name = dict[str, 'FileSystemKeyValueStoreClient']()
28-
"""A dictionary to cache clients by their names."""
29-
3027

3128
class FileSystemKeyValueStoreClient(KeyValueStoreClient):
3229
"""A file system implementation of the key-value store client.
@@ -42,6 +39,9 @@ class FileSystemKeyValueStoreClient(KeyValueStoreClient):
4239
_STORAGE_SUBDIR = 'key_value_stores'
4340
"""The name of the subdirectory where key-value stores are stored."""
4441

42+
_cache_by_name: ClassVar[dict[str, FileSystemKeyValueStoreClient]] = {}
43+
"""A dictionary to cache clients by their names."""
44+
4545
def __init__(
4646
self,
4747
*,
@@ -122,8 +122,8 @@ async def open(
122122
name = name or cls._DEFAULT_NAME
123123

124124
# Check if the client is already cached by name.
125-
if name in _cache_by_name:
126-
return _cache_by_name[name]
125+
if name in cls._cache_by_name:
126+
return cls._cache_by_name[name]
127127

128128
storage_dir = storage_dir or Path.cwd()
129129
kvs_path = storage_dir / cls._STORAGE_SUBDIR / name
@@ -169,7 +169,7 @@ async def open(
169169
await client._update_metadata()
170170

171171
# Cache the client by name.
172-
_cache_by_name[name] = client
172+
cls._cache_by_name[name] = client
173173

174174
return client
175175

@@ -181,8 +181,8 @@ async def drop(self) -> None:
181181
await asyncio.to_thread(shutil.rmtree, self.path_to_kvs)
182182

183183
# Remove the client from the cache.
184-
if self.name in _cache_by_name:
185-
del _cache_by_name[self.name]
184+
if self.name in self.__class__._cache_by_name: # noqa: SLF001
185+
del self.__class__._cache_by_name[self.name] # noqa: SLF001
186186

187187
@override
188188
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:

src/crawlee/storage_clients/_memory/_dataset_client.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from datetime import datetime, timezone
44
from logging import getLogger
5-
from typing import TYPE_CHECKING, Any
5+
from typing import TYPE_CHECKING, Any, ClassVar
66

77
from typing_extensions import override
88

@@ -16,9 +16,6 @@
1616

1717
logger = getLogger(__name__)
1818

19-
_cache_by_name = dict[str, 'MemoryDatasetClient']()
20-
"""A dictionary to cache clients by their names."""
21-
2219

2320
class MemoryDatasetClient(DatasetClient):
2421
"""A memory implementation of the dataset client.
@@ -31,6 +28,9 @@ class MemoryDatasetClient(DatasetClient):
3128
_DEFAULT_NAME = 'default'
3229
"""The default name for the dataset when no name is provided."""
3330

31+
_cache_by_name: ClassVar[dict[str, MemoryDatasetClient]] = {}
32+
"""A dictionary to cache clients by their names."""
33+
3434
def __init__(
3535
self,
3636
*,
@@ -102,8 +102,8 @@ async def open(
102102
name = name or cls._DEFAULT_NAME
103103

104104
# Check if the client is already cached by name.
105-
if name in _cache_by_name:
106-
client = _cache_by_name[name]
105+
if name in cls._cache_by_name:
106+
client = cls._cache_by_name[name]
107107
await client._update_metadata(update_accessed_at=True) # noqa: SLF001
108108
return client
109109

@@ -120,7 +120,7 @@ async def open(
120120
)
121121

122122
# Cache the client by name
123-
_cache_by_name[name] = client
123+
cls._cache_by_name[name] = client
124124

125125
return client
126126

@@ -130,8 +130,8 @@ async def drop(self) -> None:
130130
self._metadata.item_count = 0
131131

132132
# Remove the client from the cache
133-
if self.name in _cache_by_name:
134-
del _cache_by_name[self.name]
133+
if self.name in self.__class__._cache_by_name:
134+
del self.__class__._cache_by_name[self.name]
135135

136136
@override
137137
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:

src/crawlee/storage_clients/_memory/_key_value_store_client.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import sys
44
from datetime import datetime, timezone
55
from logging import getLogger
6-
from typing import TYPE_CHECKING, Any
6+
from typing import TYPE_CHECKING, Any, ClassVar
77

88
from typing_extensions import override
99

@@ -18,9 +18,6 @@
1818

1919
logger = getLogger(__name__)
2020

21-
_cache_by_name = dict[str, 'MemoryKeyValueStoreClient']()
22-
"""A dictionary to cache clients by their names."""
23-
2421

2522
class MemoryKeyValueStoreClient(KeyValueStoreClient):
2623
"""A memory implementation of the key-value store client.
@@ -33,6 +30,9 @@ class MemoryKeyValueStoreClient(KeyValueStoreClient):
3330
_DEFAULT_NAME = 'default'
3431
"""The default name for the key-value store when no name is provided."""
3532

33+
_cache_by_name: ClassVar[dict[str, MemoryKeyValueStoreClient]] = {}
34+
"""A dictionary to cache clients by their names."""
35+
3636
def __init__(
3737
self,
3838
*,
@@ -97,8 +97,8 @@ async def open(
9797
name = name or cls._DEFAULT_NAME
9898

9999
# Check if the client is already cached by name
100-
if name in _cache_by_name:
101-
client = _cache_by_name[name]
100+
if name in cls._cache_by_name:
101+
client = cls._cache_by_name[name]
102102
await client._update_metadata(update_accessed_at=True) # noqa: SLF001
103103
return client
104104

@@ -115,7 +115,7 @@ async def open(
115115
)
116116

117117
# Cache the client by name
118-
_cache_by_name[name] = client
118+
cls._cache_by_name[name] = client
119119

120120
return client
121121

@@ -125,8 +125,8 @@ async def drop(self) -> None:
125125
self._store.clear()
126126

127127
# Remove from cache
128-
if self.name in _cache_by_name:
129-
del _cache_by_name[self.name]
128+
if self.name in self.__class__._cache_by_name:
129+
del self.__class__._cache_by_name[self.name]
130130

131131
@override
132132
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:

src/crawlee/storages/_dataset.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
from io import StringIO
55
from pathlib import Path
6-
from typing import TYPE_CHECKING, Literal
6+
from typing import TYPE_CHECKING, ClassVar, Literal
77

88
from typing_extensions import override
99

@@ -30,32 +30,6 @@
3030

3131
logger = logging.getLogger(__name__)
3232

33-
# TODO:
34-
# - caching / memoization of Dataset
35-
36-
# Properties:
37-
# - id
38-
# - name
39-
# - metadata
40-
41-
# Methods:
42-
# - open
43-
# - drop
44-
# - push_data
45-
# - get_data
46-
# - iterate_items
47-
# - export_to
48-
# - export_to_json
49-
# - export_to_csv
50-
51-
# Breaking changes:
52-
# - from_storage_object method has been removed - Use the open method with name and/or id instead.
53-
# - get_info -> metadata property
54-
# - storage_object -> metadata property
55-
# - set_metadata method has been removed - Do we want to support it (e.g. for renaming)?
56-
# - write_to_json -> export_to_json
57-
# - write_to_csv -> export_to_csv
58-
5933

6034
@docs_group('Classes')
6135
class Dataset(Storage):
@@ -90,6 +64,12 @@ class Dataset(Storage):
9064
```
9165
"""
9266

67+
_cache_by_id: ClassVar[dict[str, Dataset]] = {}
68+
"""A dictionary to cache datasets by their IDs."""
69+
70+
_cache_by_name: ClassVar[dict[str, Dataset]] = {}
71+
"""A dictionary to cache datasets by their names."""
72+
9373
def __init__(self, client: DatasetClient) -> None:
9474
"""Initialize a new instance.
9575
@@ -137,6 +117,12 @@ async def open(
137117
if id and name:
138118
raise ValueError('Only one of "id" or "name" can be specified, not both.')
139119

120+
# Check if dataset is already cached by id or name
121+
if id and id in cls._cache_by_id:
122+
return cls._cache_by_id[id]
123+
if name and name in cls._cache_by_name:
124+
return cls._cache_by_name[name]
125+
140126
configuration = service_locator.get_configuration() if configuration is None else configuration
141127
storage_client = service_locator.get_storage_client() if storage_client is None else storage_client
142128
purge_on_start = configuration.purge_on_start if purge_on_start is None else purge_on_start
@@ -149,10 +135,24 @@ async def open(
149135
storage_dir=storage_dir,
150136
)
151137

152-
return cls(client)
138+
dataset = cls(client)
139+
140+
# Cache the dataset by id and name if available
141+
if dataset.id:
142+
cls._cache_by_id[dataset.id] = dataset
143+
if dataset.name:
144+
cls._cache_by_name[dataset.name] = dataset
145+
146+
return dataset
153147

154148
@override
155149
async def drop(self) -> None:
150+
# Remove from cache before dropping
151+
if self.id in self._cache_by_id:
152+
del self._cache_by_id[self.id]
153+
if self.name and self.name in self._cache_by_name:
154+
del self._cache_by_name[self.name]
155+
156156
await self._client.drop()
157157

158158
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:

0 commit comments

Comments
 (0)