Skip to content

Commit da22ef5

Browse files
committed
Add Dataset & KVS file system cliets tests
1 parent ff516ed commit da22ef5

20 files changed

+756
-1559
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ indent-style = "space"
164164
"F401", # Unused imports
165165
]
166166
"**/{tests}/*" = [
167+
"ASYNC230", # Async functions should not open files with blocking methods like `open`
167168
"D", # Everything from the pydocstyle
168169
"INP001", # File {filename} is part of an implicit namespace package, add an __init__.py
169170
"PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable

src/crawlee/storage_clients/_file_system/_dataset_client.py

Lines changed: 72 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323

2424
logger = getLogger(__name__)
2525

26+
_cache_by_name = dict[str, 'FileSystemDatasetClient']()
27+
"""A dictionary to cache clients by their names."""
28+
2629

2730
class FileSystemDatasetClient(DatasetClient):
2831
"""A file system storage implementation of the dataset client.
@@ -55,12 +58,15 @@ def __init__(
5558
5659
Preferably use the `FileSystemDatasetClient.open` class method to create a new instance.
5760
"""
58-
self._id = id
59-
self._name = name
60-
self._created_at = created_at
61-
self._accessed_at = accessed_at
62-
self._modified_at = modified_at
63-
self._item_count = item_count
61+
self._metadata = DatasetMetadata(
62+
id=id,
63+
name=name,
64+
created_at=created_at,
65+
accessed_at=accessed_at,
66+
modified_at=modified_at,
67+
item_count=item_count,
68+
)
69+
6470
self._storage_dir = storage_dir
6571

6672
# Internal attributes.
@@ -70,49 +76,50 @@ def __init__(
7076
@override
7177
@property
7278
def id(self) -> str:
73-
return self._id
79+
return self._metadata.id
7480

7581
@override
7682
@property
77-
def name(self) -> str | None:
78-
return self._name
83+
def name(self) -> str:
84+
return self._metadata.name
7985

8086
@override
8187
@property
8288
def created_at(self) -> datetime:
83-
return self._created_at
89+
return self._metadata.created_at
8490

8591
@override
8692
@property
8793
def accessed_at(self) -> datetime:
88-
return self._accessed_at
94+
return self._metadata.accessed_at
8995

9096
@override
9197
@property
9298
def modified_at(self) -> datetime:
93-
return self._modified_at
99+
return self._metadata.modified_at
94100

95101
@override
96102
@property
97103
def item_count(self) -> int:
98-
return self._item_count
104+
return self._metadata.item_count
99105

100106
@property
101-
def _path_to_dataset(self) -> Path:
107+
def path_to_dataset(self) -> Path:
102108
"""The full path to the dataset directory."""
103-
return self._storage_dir / self._STORAGE_SUBDIR / self._name
109+
return self._storage_dir / self._STORAGE_SUBDIR / self.name
104110

105111
@property
106-
def _path_to_metadata(self) -> Path:
112+
def path_to_metadata(self) -> Path:
107113
"""The full path to the dataset metadata file."""
108-
return self._path_to_dataset / METADATA_FILENAME
114+
return self.path_to_dataset / METADATA_FILENAME
109115

110116
@override
111117
@classmethod
112118
async def open(
113119
cls,
114-
id: str | None,
115-
name: str | None,
120+
*,
121+
id: str | None = None,
122+
name: str | None = None,
116123
storage_dir: Path,
117124
) -> FileSystemDatasetClient:
118125
"""Open an existing dataset client or create a new one if it does not exist.
@@ -134,6 +141,11 @@ async def open(
134141
)
135142

136143
name = name or cls._DEFAULT_NAME
144+
145+
# Check if the client is already cached by name.
146+
if name in _cache_by_name:
147+
return _cache_by_name[name]
148+
137149
dataset_path = storage_dir / cls._STORAGE_SUBDIR / name
138150
metadata_path = dataset_path / METADATA_FILENAME
139151

@@ -178,25 +190,40 @@ async def open(
178190
)
179191
await client._update_metadata()
180192

193+
# Cache the client by name.
194+
_cache_by_name[name] = client
195+
181196
return client
182197

183198
@override
184199
async def drop(self) -> None:
185-
# If the dataset directory exists, remove it recursively.
186-
if self._path_to_dataset.exists():
200+
# If the client directory exists, remove it recursively.
201+
if self.path_to_dataset.exists():
187202
async with self._lock:
188-
await asyncio.to_thread(shutil.rmtree, self._path_to_dataset)
203+
await asyncio.to_thread(shutil.rmtree, self.path_to_dataset)
204+
205+
# Remove the client from the cache.
206+
if self.name in _cache_by_name:
207+
del _cache_by_name[self.name]
189208

190209
@override
191210
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:
211+
new_item_count = self.item_count
212+
192213
# If data is a list, push each item individually.
193214
if isinstance(data, list):
194215
for item in data:
195-
await self._push_item(item)
216+
new_item_count += 1
217+
await self._push_item(item, new_item_count)
196218
else:
197-
await self._push_item(data)
219+
new_item_count += 1
220+
await self._push_item(data, new_item_count)
198221

199-
await self._update_metadata(update_accessed_at=True, update_modified_at=True)
222+
await self._update_metadata(
223+
update_accessed_at=True,
224+
update_modified_at=True,
225+
new_item_count=new_item_count,
226+
)
200227

201228
@override
202229
async def get_data(
@@ -223,8 +250,8 @@ async def get_data(
223250
)
224251

225252
# If the dataset directory does not exist, log a warning and return an empty page.
226-
if not self._path_to_dataset.exists():
227-
logger.warning(f'Dataset directory not found: {self._path_to_dataset}')
253+
if not self.path_to_dataset.exists():
254+
logger.warning(f'Dataset directory not found: {self.path_to_dataset}')
228255
return DatasetItemsListPage(
229256
count=0,
230257
offset=offset,
@@ -298,8 +325,8 @@ async def iterate(
298325
)
299326

300327
# If the dataset directory does not exist, log a warning and return immediately.
301-
if not self._path_to_dataset.exists():
302-
logger.warning(f'Dataset directory not found: {self._path_to_dataset}')
328+
if not self.path_to_dataset.exists():
329+
logger.warning(f'Dataset directory not found: {self.path_to_dataset}')
303330
return
304331

305332
# Get the list of sorted data files.
@@ -334,47 +361,44 @@ async def iterate(
334361
async def _update_metadata(
335362
self,
336363
*,
364+
new_item_count: int | None = None,
337365
update_accessed_at: bool = False,
338366
update_modified_at: bool = False,
339367
) -> None:
340368
"""Update the dataset metadata file with current information.
341369
342370
Args:
371+
new_item_count: If provided, update the item count to this value.
343372
update_accessed_at: If True, update the `accessed_at` timestamp to the current time.
344373
update_modified_at: If True, update the `modified_at` timestamp to the current time.
345374
"""
346375
now = datetime.now(timezone.utc)
347-
metadata = DatasetMetadata(
348-
id=self._id,
349-
name=self._name,
350-
created_at=self._created_at,
351-
accessed_at=now if update_accessed_at else self._accessed_at,
352-
modified_at=now if update_modified_at else self._modified_at,
353-
item_count=self._item_count,
354-
)
376+
377+
self._metadata.accessed_at = now if update_accessed_at else self.accessed_at
378+
self._metadata.modified_at = now if update_modified_at else self.modified_at
379+
self._metadata.item_count = new_item_count if new_item_count else self.item_count
355380

356381
# Ensure the parent directory for the metadata file exists.
357-
await asyncio.to_thread(self._path_to_metadata.parent.mkdir, parents=True, exist_ok=True)
382+
await asyncio.to_thread(self.path_to_metadata.parent.mkdir, parents=True, exist_ok=True)
358383

359384
# Dump the serialized metadata to the file.
360-
data = await json_dumps(metadata.model_dump())
361-
await asyncio.to_thread(self._path_to_metadata.write_text, data, encoding='utf-8')
385+
data = await json_dumps(self._metadata.model_dump())
386+
await asyncio.to_thread(self.path_to_metadata.write_text, data, encoding='utf-8')
362387

363-
async def _push_item(self, item: dict[str, Any]) -> None:
388+
async def _push_item(self, item: dict[str, Any], item_id: int) -> None:
364389
"""Push a single item to the dataset.
365390
366391
This method increments the item count, writes the item as a JSON file with a zero-padded filename,
367392
and updates the metadata.
368393
"""
369394
# Acquire the lock to perform file operations safely.
370395
async with self._lock:
371-
self._item_count += 1
372396
# Generate the filename for the new item using zero-padded numbering.
373-
filename = f'{str(self._item_count).zfill(self._LOCAL_ENTRY_NAME_DIGITS)}.json'
374-
file_path = self._path_to_dataset / filename
397+
filename = f'{str(item_id).zfill(self._LOCAL_ENTRY_NAME_DIGITS)}.json'
398+
file_path = self.path_to_dataset / filename
375399

376400
# Ensure the dataset directory exists.
377-
await asyncio.to_thread(self._path_to_dataset.mkdir, parents=True, exist_ok=True)
401+
await asyncio.to_thread(self.path_to_dataset.mkdir, parents=True, exist_ok=True)
378402

379403
# Dump the serialized item to the file.
380404
data = await json_dumps(item)
@@ -389,12 +413,12 @@ async def _get_sorted_data_files(self) -> list[Path]:
389413
# Retrieve and sort all JSON files in the dataset directory numerically.
390414
files = await asyncio.to_thread(
391415
sorted,
392-
self._path_to_dataset.glob('*.json'),
416+
self.path_to_dataset.glob('*.json'),
393417
key=lambda f: int(f.stem) if f.stem.isdigit() else 0,
394418
)
395419

396420
# Remove the metadata file from the list if present.
397-
if self._path_to_metadata in files:
398-
files.remove(self._path_to_metadata)
421+
if self.path_to_metadata in files:
422+
files.remove(self.path_to_metadata)
399423

400424
return files

0 commit comments

Comments
 (0)