Skip to content

Commit 0e6d87c

Browse files
committed
Add even more debug. Probably another bug in RQ
1 parent 3730920 commit 0e6d87c

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,36 +546,45 @@ async def reclaim_request(
546546

547547
# Update sequence number and state to ensure proper ordering.
548548
if forefront:
549+
logger.info('Reclaiming forefront request')
549550
# Remove from regular requests if it was there
550551
state.regular_requests.pop(request.unique_key, None)
551552
sequence_number = state.forefront_sequence_counter
552553
state.forefront_sequence_counter += 1
553554
state.forefront_requests[request.unique_key] = sequence_number
554555
else:
556+
logger.info('Reclaiming regular request')
555557
# Remove from forefront requests if it was there
556558
state.forefront_requests.pop(request.unique_key, None)
557559
sequence_number = state.sequence_counter
558560
state.sequence_counter += 1
559561
state.regular_requests[request.unique_key] = sequence_number
560562

561563
# Save the clean request without extra fields
564+
562565
request_data = await json_dumps(request.model_dump())
566+
logger.info('Atomic write')
563567
await atomic_write(request_path, request_data)
564568

565569
# Remove from in-progress.
570+
logger.info('Remove from in-progress')
566571
state.in_progress_requests.discard(request.unique_key)
567572

573+
logger.info('Update RQ metadata.')
568574
# Update RQ metadata.
569575
await self._update_metadata(
570576
update_modified_at=True,
571577
update_accessed_at=True,
572578
)
579+
logger.info('Updated RQ metadata.')
573580

574581
# Add the request back to the cache.
575582
if forefront:
576583
self._request_cache.appendleft(request)
584+
logger.info(f'Add the request back to the cache: forefront. {self._request_cache}')
577585
else:
578586
self._request_cache.append(request)
587+
logger.info(f'Add the request back to the cache: normal. . {self._request_cache}')
579588

580589
return ProcessedRequest(
581590
unique_key=request.unique_key,
@@ -595,15 +604,18 @@ async def is_empty(self) -> bool:
595604
# If there are in-progress requests, return False immediately.
596605
if len(state.in_progress_requests) > 0:
597606
self._is_empty_cache = False
607+
logger.info(f'{state.in_progress_requests=}')
598608
return False
599609

600610
# If we have a cached requests, check them first (fast path).
601611
if self._request_cache:
602612
for req in self._request_cache:
603613
if req.unique_key not in state.handled_requests:
604614
self._is_empty_cache = False
615+
logger.info(f'{(req.unique_key not in state.handled_requests)=}')
605616
return False
606617
self._is_empty_cache = True
618+
logger.info(f'{(len(state.in_progress_requests) == 0)=}')
607619
return len(state.in_progress_requests) == 0
608620

609621
# Fallback: check state for unhandled requests.
@@ -615,9 +627,11 @@ async def is_empty(self) -> bool:
615627

616628
if unhandled_requests:
617629
self._is_empty_cache = False
630+
logger.info(f'{unhandled_requests=}')
618631
return False
619632

620633
self._is_empty_cache = True
634+
logger.info('Last resort is empty')
621635
return True
622636

623637
def _get_request_path(self, unique_key: str) -> Path:

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,6 +1300,7 @@ async def test_timeout_in_handler(sleep_type: str) -> None:
13001300
max_request_retries=max_request_retries,
13011301
)
13021302
crawler.log.setLevel(logging.DEBUG)
1303+
logging.getLogger('crawlee.storage_clients._file_system._request_queue_client').setLevel(logging.DEBUG)
13031304

13041305
mocked_handler_before_sleep = Mock()
13051306
mocked_handler_after_sleep = Mock()

0 commit comments

Comments
 (0)