Skip to content

Commit 60e0968

Browse files
committed
Address feedback
1 parent 9aaee11 commit 60e0968

File tree

9 files changed

+103
-69
lines changed

9 files changed

+103
-69
lines changed

src/crawlee/_autoscaling/autoscaled_pool.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,7 @@ async def run(self) -> None:
142142

143143
logger.info('Waiting for remaining tasks to finish')
144144

145-
tasks_to_wait = list(run.worker_tasks)
146-
for task in tasks_to_wait:
145+
for task in run.worker_tasks:
147146
if not task.done():
148147
with suppress(BaseException):
149148
await task

src/crawlee/_utils/file.py

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import os
77
import tempfile
88
from pathlib import Path
9-
from typing import TYPE_CHECKING
9+
from typing import TYPE_CHECKING, overload
1010

1111
if TYPE_CHECKING:
1212
from collections.abc import AsyncIterator
@@ -54,59 +54,75 @@ async def json_dumps(obj: Any) -> str:
5454
return await asyncio.to_thread(json.dumps, obj, ensure_ascii=False, indent=2, default=str)
5555

5656

57-
async def atomic_write_text(path: Path, data: str) -> None:
58-
dir_path = path.parent
57+
@overload
58+
async def atomic_write(
59+
path: Path,
60+
data: str,
61+
*,
62+
is_binary: bool = False,
63+
) -> None: ...
5964

60-
def _sync_write_text() -> str:
61-
# create a temp file in the target dir, return its name
62-
fd, tmp_path = tempfile.mkstemp(
63-
suffix=f'{path.suffix}.tmp',
64-
prefix=f'{path.name}.',
65-
dir=str(dir_path),
66-
)
67-
try:
68-
with os.fdopen(fd, 'w', encoding='utf-8') as tmp_file:
69-
tmp_file.write(data)
70-
except:
71-
Path(tmp_path).unlink(missing_ok=True)
72-
raise
73-
return tmp_path
7465

75-
tmp_path = await asyncio.to_thread(_sync_write_text)
66+
@overload
67+
async def atomic_write(
68+
path: Path,
69+
data: bytes,
70+
*,
71+
is_binary: bool = True,
72+
) -> None: ...
7673

77-
try:
78-
await asyncio.to_thread(os.replace, tmp_path, str(path))
79-
except (FileNotFoundError, PermissionError):
80-
# fallback if tmp went missing
81-
await asyncio.to_thread(path.write_text, data, encoding='utf-8')
82-
finally:
83-
await asyncio.to_thread(Path(tmp_path).unlink, missing_ok=True)
8474

75+
async def atomic_write(
76+
path: Path,
77+
data: str | bytes,
78+
*,
79+
is_binary: bool = False,
80+
) -> None:
81+
"""Write data to a file atomically to prevent data corruption or partial writes.
8582
86-
async def atomic_write_bytes(path: Path, data: bytes) -> None:
83+
This function handles both text and binary data. It ensures atomic writing by creating
84+
a temporary file and then atomically replacing the target file, which prevents data
85+
corruption if the process is interrupted during the write operation.
86+
87+
For example, if a process (crawler) is interrupted while writing a file, the file may end up in an
88+
incomplete or corrupted state. This might be especially unwanted for metadata files.
89+
90+
Args:
91+
path: The path to the destination file.
92+
data: The data to write to the file (string or bytes).
93+
is_binary: If True, write in binary mode. If False (default), write in text mode.
94+
"""
8795
dir_path = path.parent
8896

89-
def _sync_write_bytes() -> str:
97+
def _sync_write() -> str:
98+
# create a temp file in the target dir, return its name
9099
fd, tmp_path = tempfile.mkstemp(
91100
suffix=f'{path.suffix}.tmp',
92101
prefix=f'{path.name}.',
93102
dir=str(dir_path),
94103
)
95104
try:
96-
with os.fdopen(fd, 'wb') as tmp_file:
97-
tmp_file.write(data)
98-
except:
105+
if is_binary:
106+
with os.fdopen(fd, 'wb') as tmp_file:
107+
tmp_file.write(data) # type: ignore[arg-type]
108+
else:
109+
with os.fdopen(fd, 'w', encoding='utf-8') as tmp_file:
110+
tmp_file.write(data) # type: ignore[arg-type]
111+
except Exception: # broader exception handling
99112
Path(tmp_path).unlink(missing_ok=True)
100113
raise
101114
return tmp_path
102115

103-
tmp_path = await asyncio.to_thread(_sync_write_bytes)
116+
tmp_path = await asyncio.to_thread(_sync_write)
104117

105118
try:
106119
await asyncio.to_thread(os.replace, tmp_path, str(path))
107120
except (FileNotFoundError, PermissionError):
108121
# fallback if tmp went missing
109-
await asyncio.to_thread(path.write_bytes, data)
122+
if is_binary:
123+
await asyncio.to_thread(path.write_bytes, data) # type: ignore[arg-type]
124+
else:
125+
await asyncio.to_thread(path.write_text, data, encoding='utf-8') # type: ignore[arg-type]
110126
finally:
111127
await asyncio.to_thread(Path(tmp_path).unlink, missing_ok=True)
112128

src/crawlee/statistics/_error_snapshotter.py

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import hashlib
45
import re
56
import string
@@ -28,23 +29,42 @@ async def capture_snapshot(
2829
file_and_line: str,
2930
context: BasicCrawlingContext,
3031
) -> None:
31-
"""Capture error snapshot and save it to key value store."""
32-
snapshot = await context.get_snapshot()
33-
if not snapshot:
34-
return
35-
36-
base = self._get_snapshot_base_name(error_message, file_and_line)
37-
kvs = await KeyValueStore.open(name=self._kvs_name)
38-
39-
# Save HTML snapshot if present
40-
if snapshot.html:
41-
key_html = f'{base}.html'
42-
await kvs.set_value(key_html, snapshot.html, content_type='text/html')
43-
44-
# Save screenshot snapshot if present
45-
if snapshot.screenshot:
46-
key_jpg = f'{base}.jpg'
47-
await kvs.set_value(key_jpg, snapshot.screenshot, content_type='image/jpeg')
32+
"""Capture error snapshot and save it to key value store.
33+
34+
It saves the error snapshot directly to a key value store. It can't use `context.get_key_value_store` because
35+
it returns `KeyValueStoreChangeRecords` which is commited to the key value store only if the `RequestHandler`
36+
returned without an exception. ErrorSnapshotter is on the contrary active only when `RequestHandler` fails with
37+
an exception.
38+
39+
Args:
40+
error_message: Used in filename of the snapshot.
41+
file_and_line: Used in filename of the snapshot.
42+
context: Context that is used to get the snapshot.
43+
"""
44+
if snapshot := await context.get_snapshot():
45+
kvs = await KeyValueStore.open(name=self._kvs_name)
46+
snapshot_base_name = self._get_snapshot_base_name(error_message, file_and_line)
47+
snapshot_save_tasks = list[asyncio.Task]()
48+
49+
if snapshot.html:
50+
snapshot_save_tasks.append(
51+
asyncio.create_task(self._save_html(kvs, snapshot.html, base_name=snapshot_base_name))
52+
)
53+
54+
if snapshot.screenshot:
55+
snapshot_save_tasks.append(
56+
asyncio.create_task(self._save_screenshot(kvs, snapshot.screenshot, base_name=snapshot_base_name))
57+
)
58+
59+
await asyncio.gather(*snapshot_save_tasks)
60+
61+
async def _save_html(self, kvs: KeyValueStore, html: str, base_name: str) -> None:
62+
file_name = f'{base_name}.html'
63+
await kvs.set_value(file_name, html, content_type='text/html')
64+
65+
async def _save_screenshot(self, kvs: KeyValueStore, screenshot: bytes, base_name: str) -> None:
66+
file_name = f'{base_name}.jpg'
67+
await kvs.set_value(file_name, screenshot, content_type='image/jpeg')
4868

4969
def _sanitize_filename(self, filename: str) -> str:
5070
return re.sub(f'[^{re.escape(self.ALLOWED_CHARACTERS)}]', '', filename[: self.MAX_FILENAME_LENGTH])

src/crawlee/storage_clients/_file_system/_dataset_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from crawlee._consts import METADATA_FILENAME
1515
from crawlee._utils.crypto import crypto_random_object_id
16-
from crawlee._utils.file import atomic_write_text, json_dumps
16+
from crawlee._utils.file import atomic_write, json_dumps
1717
from crawlee.storage_clients._base import DatasetClient
1818
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
1919

@@ -442,7 +442,7 @@ async def _update_metadata(
442442

443443
# Dump the serialized metadata to the file.
444444
data = await json_dumps(self._metadata.model_dump())
445-
await atomic_write_text(self.path_to_metadata, data)
445+
await atomic_write(self.path_to_metadata, data)
446446

447447
async def _push_item(self, item: dict[str, Any], item_id: int) -> None:
448448
"""Push a single item to the dataset.
@@ -463,7 +463,7 @@ async def _push_item(self, item: dict[str, Any], item_id: int) -> None:
463463

464464
# Dump the serialized item to the file.
465465
data = await json_dumps(item)
466-
await atomic_write_text(file_path, data)
466+
await atomic_write(file_path, data)
467467

468468
async def _get_sorted_data_files(self) -> list[Path]:
469469
"""Retrieve and return a sorted list of data files in the dataset directory.

src/crawlee/storage_clients/_file_system/_key_value_store_client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from crawlee._consts import METADATA_FILENAME
1616
from crawlee._utils.crypto import crypto_random_object_id
17-
from crawlee._utils.file import atomic_write_bytes, atomic_write_text, infer_mime_type, json_dumps
17+
from crawlee._utils.file import atomic_write, infer_mime_type, json_dumps
1818
from crawlee.storage_clients._base import KeyValueStoreClient
1919
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata
2020

@@ -325,10 +325,10 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
325325
await asyncio.to_thread(self.path_to_kvs.mkdir, parents=True, exist_ok=True)
326326

327327
# Write the value to the file.
328-
await atomic_write_bytes(record_path, value_bytes)
328+
await atomic_write(record_path, value_bytes, is_binary=True)
329329

330330
# Write the record metadata to the file.
331-
await atomic_write_text(record_metadata_filepath, record_metadata_content)
331+
await atomic_write(record_metadata_filepath, record_metadata_content)
332332

333333
# Update the KVS metadata to record the access and modification.
334334
await self._update_metadata(update_accessed_at=True, update_modified_at=True)
@@ -445,7 +445,7 @@ async def _update_metadata(
445445

446446
# Dump the serialized metadata to the file.
447447
data = await json_dumps(self._metadata.model_dump())
448-
await atomic_write_text(self.path_to_metadata, data)
448+
await atomic_write(self.path_to_metadata, data)
449449

450450
def _encode_key(self, key: str) -> str:
451451
"""Encode a key to make it safe for use in a file path."""

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from crawlee import Request
1515
from crawlee._consts import METADATA_FILENAME
1616
from crawlee._utils.crypto import crypto_random_object_id
17-
from crawlee._utils.file import atomic_write_text, json_dumps
17+
from crawlee._utils.file import atomic_write, json_dumps
1818
from crawlee.storage_clients._base import RequestQueueClient
1919
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
2020

@@ -336,7 +336,7 @@ async def add_batch_of_requests(
336336
# Update the existing request file
337337
request_path = self.path_to_rq / f'{existing_request.id}.json'
338338
request_data = await json_dumps(existing_request.model_dump())
339-
await atomic_write_text(request_path, request_data)
339+
await atomic_write(request_path, request_data)
340340

341341
processed_requests.append(
342342
ProcessedRequest(
@@ -362,7 +362,7 @@ async def add_batch_of_requests(
362362
request_dict['_sequence'] = sequence_number
363363

364364
request_data = await json_dumps(request_dict)
365-
await atomic_write_text(request_path, request_data)
365+
await atomic_write(request_path, request_data)
366366

367367
# Update metadata counts
368368
new_total_request_count += 1
@@ -620,7 +620,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
620620
return None
621621

622622
request_data = await json_dumps(request.model_dump())
623-
await atomic_write_text(request_path, request_data)
623+
await atomic_write(request_path, request_data)
624624

625625
# Update metadata timestamps
626626
await self._update_metadata(
@@ -678,7 +678,7 @@ async def reclaim_request(
678678
return None
679679

680680
request_data = await json_dumps(request.model_dump())
681-
await atomic_write_text(request_path, request_data)
681+
await atomic_write(request_path, request_data)
682682

683683
# Update metadata timestamps
684684
await self._update_metadata(update_modified_at=True, update_accessed_at=True)
@@ -781,4 +781,4 @@ async def _update_metadata(
781781

782782
# Dump the serialized metadata to the file.
783783
data = await json_dumps(self._metadata.model_dump())
784-
await atomic_write_text(self.path_to_metadata, data)
784+
await atomic_write(self.path_to_metadata, data)

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,6 @@ async def test_crawler_get_storages() -> None:
618618
assert isinstance(kvs, KeyValueStore)
619619

620620

621-
# THIS
622621
async def test_crawler_run_requests() -> None:
623622
crawler = BasicCrawler()
624623
seen_urls = list[str]()

tests/unit/storage_clients/_file_system/test_fs_kvs_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,9 @@ async def test_iterate_keys_with_exclusive_start_key(kvs_client: FileSystemKeyVa
312312
"""Test that exclusive_start_key parameter returns only keys after it alphabetically."""
313313
# Add some values with alphabetical keys
314314
await kvs_client.set_value(key='a-key', value='value-a')
315-
await kvs_client.set_value(key='b-key', value='value-b')
316-
await kvs_client.set_value(key='c-key', value='value-c')
317315
await kvs_client.set_value(key='d-key', value='value-d')
316+
await kvs_client.set_value(key='c-key', value='value-c')
317+
await kvs_client.set_value(key='b-key', value='value-b')
318318

319319
# Iterate with exclusive start key
320320
keys = [key.key async for key in kvs_client.iterate_keys(exclusive_start_key='b-key')]

tests/unit/storage_clients/_memory/test_memory_kvs_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ async def test_iterate_keys(kvs_client: MemoryKeyValueStoreClient) -> None:
167167
async def test_iterate_keys_with_exclusive_start_key(kvs_client: MemoryKeyValueStoreClient) -> None:
168168
"""Test that exclusive_start_key parameter returns only keys after it alphabetically."""
169169
# Set some values
170-
for key in ['a_key', 'b_key', 'c_key', 'd_key', 'e_key']:
170+
for key in ['b_key', 'c_key', 'a_key', 'e_key', 'd_key']:
171171
await kvs_client.set_value(key=key, value=f'value for {key}')
172172

173173
# Get keys starting after 'b_key'
@@ -181,7 +181,7 @@ async def test_iterate_keys_with_exclusive_start_key(kvs_client: MemoryKeyValueS
181181
async def test_iterate_keys_with_limit(kvs_client: MemoryKeyValueStoreClient) -> None:
182182
"""Test that the limit parameter returns only the specified number of keys."""
183183
# Set some values
184-
for key in ['a_key', 'b_key', 'c_key', 'd_key', 'e_key']:
184+
for key in ['a_key', 'e_key', 'c_key', 'b_key', 'd_key']:
185185
await kvs_client.set_value(key=key, value=f'value for {key}')
186186

187187
# Get first 3 keys

0 commit comments

Comments
 (0)