Skip to content

Commit 4c8dd10

Browse files
committed
fix deadlock in KVS
1 parent 59d81d8 commit 4c8dd10

File tree

4 files changed

+32
-35
lines changed

4 files changed

+32
-35
lines changed

src/crawlee/storage_clients/_file_system/_key_value_store_client.py

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,7 @@ async def open(
151151

152152
# Get a new instance by name.
153153
else:
154-
kvs_path = (
155-
kvs_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else kvs_base_path / name
156-
)
154+
kvs_path = kvs_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else kvs_base_path / name
157155
metadata_path = kvs_path / METADATA_FILENAME
158156

159157
# If the key-value store directory exists, reconstruct the client from the metadata file.
@@ -235,7 +233,7 @@ async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
235233
return None
236234

237235
# Read the metadata file
238-
async with self._lock:
236+
async with self._lock: # DEADLOCK
239237
file = await asyncio.to_thread(open, record_metadata_filepath)
240238
try:
241239
metadata_content = json.load(file)
@@ -364,41 +362,42 @@ async def iterate_keys(
364362
if not self.path_to_kvs.exists():
365363
return
366364

367-
count = 0
365+
# List and sort all files *inside* a brief lock, then release it immediately:
368366
async with self._lock:
369-
# Get all files in the KVS directory, sorted alphabetically
370367
files = sorted(await asyncio.to_thread(list, self.path_to_kvs.glob('*')))
371368

372-
for file_path in files:
373-
# Skip the main metadata file
374-
if file_path.name == METADATA_FILENAME:
375-
continue
369+
count = 0
376370

377-
# Only process metadata files for records
378-
if not file_path.name.endswith(f'.{METADATA_FILENAME}'):
379-
continue
371+
for file_path in files:
372+
# Skip the main metadata file
373+
if file_path.name == METADATA_FILENAME:
374+
continue
380375

381-
# Extract the base key name from the metadata filename
382-
key_name = self._decode_key(file_path.name[: -len(f'.{METADATA_FILENAME}')])
376+
# Only process metadata files for records
377+
if not file_path.name.endswith(f'.{METADATA_FILENAME}'):
378+
continue
383379

384-
# Apply exclusive_start_key filter if provided
385-
if exclusive_start_key is not None and key_name <= exclusive_start_key:
386-
continue
380+
# Extract the base key name from the metadata filename
381+
key_name = self._decode_key(file_path.name[: -len(f'.{METADATA_FILENAME}')])
387382

388-
# Try to read and parse the metadata file
389-
try:
390-
metadata_content = await asyncio.to_thread(file_path.read_text, encoding='utf-8')
391-
metadata_dict = json.loads(metadata_content)
392-
record_metadata = KeyValueStoreRecordMetadata(**metadata_dict)
383+
# Apply exclusive_start_key filter if provided
384+
if exclusive_start_key is not None and key_name <= exclusive_start_key:
385+
continue
386+
387+
# Try to read and parse the metadata file
388+
try:
389+
metadata_content = await asyncio.to_thread(file_path.read_text, encoding='utf-8')
390+
metadata_dict = json.loads(metadata_content)
391+
record_metadata = KeyValueStoreRecordMetadata(**metadata_dict)
393392

394-
yield record_metadata
393+
yield record_metadata
395394

396-
count += 1
397-
if limit and count >= limit:
398-
break
395+
count += 1
396+
if limit and count >= limit:
397+
break
399398

400-
except (json.JSONDecodeError, ValidationError) as e:
401-
logger.warning(f'Failed to parse metadata file {file_path}: {e}')
399+
except (json.JSONDecodeError, ValidationError) as e:
400+
logger.warning(f'Failed to parse metadata file {file_path}: {e}')
402401

403402
# Update accessed_at timestamp
404403
await self._update_metadata(update_accessed_at=True)

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,7 @@ async def open(
171171

172172
# Get a new instance by name.
173173
else:
174-
rq_path = (
175-
rq_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else rq_base_path / name
176-
)
174+
rq_path = rq_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else rq_base_path / name
177175
metadata_path = rq_path / METADATA_FILENAME
178176

179177
# If the RQ directory exists, reconstruct the client from the metadata file.

tests/unit/crawlers/_http/test_http_crawler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,9 +543,9 @@ async def request_handler(context: HttpCrawlingContext) -> None:
543543
assert snapshot.html == HELLO_WORLD.decode('utf8')
544544

545545

546-
@pytest.mark.skip(reason='TODO: broken, freezing')
547546
async def test_error_snapshot_through_statistics(server_url: URL) -> None:
548-
crawler = HttpCrawler(statistics=Statistics.with_default_state(save_error_snapshots=True))
547+
statistics = Statistics.with_default_state(save_error_snapshots=True)
548+
crawler = HttpCrawler(statistics=statistics)
549549

550550
@crawler.router.default_handler
551551
async def request_handler(context: HttpCrawlingContext) -> None:

tests/unit/crawlers/_playwright/test_playwright_crawler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ async def request_handler(context: PlaywrightCrawlingContext) -> None:
553553
assert snapshot.html == HELLO_WORLD.decode('utf-8')
554554

555555

556-
@pytest.mark.skip(reason='TODO: broken, freezing')
556+
# TODO: failing
557557
async def test_error_snapshot_through_statistics(server_url: URL) -> None:
558558
"""Test correct use of error snapshotter by the Playwright crawler.
559559

0 commit comments

Comments
 (0)