Skip to content

Commit 134e8be

Browse files
committed
Add with debug stuff
1 parent e3c9fc2 commit 134e8be

File tree

3 files changed

+103
-50
lines changed

3 files changed

+103
-50
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
logger = getLogger(__name__)
2828

29+
COUNTER = iter(range(10000))
2930

3031
class ApifyRequestQueueClient(RequestQueueClient):
3132
"""An Apify platform implementation of the request queue client."""
@@ -294,18 +295,25 @@ async def fetch_next_request(self) -> Request | None:
294295
Returns:
295296
The request or `None` if there are no more pending requests.
296297
"""
298+
call_time = next(COUNTER)
297299
# Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions.
300+
logger.debug(f'Before _fetch_lock, {call_time}')
298301
async with self._fetch_lock:
302+
logger.debug(f'Fetching, {call_time}')
299303
await self._ensure_head_is_non_empty()
300304

301305
# If queue head is empty after ensuring, there are no requests
302306
if not self._queue_head:
307+
logger.debug(f'Empty, {call_time}')
303308
return None
304309

305310
# Get the next request ID from the queue head
306311
next_request_id = self._queue_head.popleft()
312+
logger.debug(f'New request, {call_time}')
307313

314+
logger.debug(f'Before hydrate, {call_time}')
308315
request = await self._get_or_hydrate_request(next_request_id)
316+
logger.debug(f'After hydrate, {call_time}')
309317

310318
# Handle potential inconsistency where request might not be in the main table yet
311319
if request is None:
@@ -324,14 +332,15 @@ async def fetch_next_request(self) -> Request | None:
324332
return None
325333

326334
# Use get request to ensure we have the full request object.
327-
request = await self.get_request(request.id)
335+
#request = await self.get_request(request.id) This seems redundant
328336
if request is None:
329337
logger.debug(
330338
'Request fetched from the beginning of queue was not found in the RQ',
331339
extra={'nextRequestId': next_request_id},
332340
)
333341
return None
334342

343+
logger.debug(f'{request.retry_count=}, {call_time}')
335344
return request
336345

337346
@override
@@ -394,42 +403,48 @@ async def reclaim_request(
394403
"""
395404
# Check if the request was marked as handled and clear it. When reclaiming,
396405
# we want to put the request back for processing.
406+
call_time = next(COUNTER)
397407
if request.was_already_handled:
398408
request.handled_at = None
399409

400-
try:
401-
# Update the request in the API.
402-
processed_request = await self._update_request(request, forefront=forefront)
403-
processed_request.unique_key = request.unique_key
404-
405-
# If the request was previously handled, decrement our handled count since
406-
# we're putting it back for processing.
407-
if request.was_already_handled and not processed_request.was_already_handled:
408-
self._assumed_handled_count -= 1
409-
410-
# Update the cache
411-
cache_key = unique_key_to_request_id(request.unique_key)
412-
self._cache_request(
413-
cache_key,
414-
processed_request,
415-
hydrated_request=request,
416-
)
410+
async with self._fetch_lock:
411+
try:
412+
# Update the request in the API.
413+
logger.debug(f'Before _update_request reclaiming, {call_time}')
414+
processed_request = await self._update_request(request, forefront=forefront)
415+
logger.debug(f'After _update_request reclaiming, {call_time}')
416+
processed_request.unique_key = request.unique_key
417+
418+
# If the request was previously handled, decrement our handled count since
419+
# we're putting it back for processing.
420+
if request.was_already_handled and not processed_request.was_already_handled:
421+
self._assumed_handled_count -= 1
422+
423+
# Update the cache
424+
cache_key = unique_key_to_request_id(request.unique_key)
425+
self._cache_request(
426+
cache_key,
427+
processed_request,
428+
hydrated_request=request,
429+
)
417430

418-
# If we're adding to the forefront, we need to check for forefront requests
419-
# in the next list_head call
420-
if forefront:
421-
self._should_check_for_forefront_requests = True
431+
# If we're adding to the forefront, we need to check for forefront requests
432+
# in the next list_head call
433+
if forefront:
434+
self._should_check_for_forefront_requests = True
422435

423-
# Try to release the lock on the request
424-
try:
425-
await self._delete_request_lock(request.id, forefront=forefront)
426-
except Exception as err:
427-
logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err)
428-
except Exception as exc:
429-
logger.debug(f'Error reclaiming request {request.id}: {exc!s}')
430-
return None
431-
else:
432-
return processed_request
436+
# Try to release the lock on the request
437+
try:
438+
logger.debug(f'Before _delete_request_lock reclaiming, {call_time}')
439+
await self._delete_request_lock(request.id, forefront=forefront)
440+
logger.debug(f'After _delete_request_lock reclaiming, {call_time}')
441+
except Exception as err:
442+
logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err)
443+
except Exception as exc:
444+
logger.debug(f'Error reclaiming request {request.id}: {exc!s}')
445+
return None
446+
else:
447+
return processed_request
433448

434449
@override
435450
async def is_empty(self) -> bool:
@@ -438,9 +453,14 @@ async def is_empty(self) -> bool:
438453
Returns:
439454
True if the queue is empty, False otherwise.
440455
"""
441-
head = await self._list_head(limit=1, lock_time=None)
442-
443-
return len(head.items) == 0 and not self._queue_has_locked_requests
456+
call_time = next(COUNTER)
457+
logger.debug(f'Before _list_head is_empty, {call_time}')
458+
async with self._fetch_lock:
459+
logger.debug(f'During _list_head is_empty, {call_time}')
460+
head = await self._list_head(limit=1, lock_time=None)
461+
logger.debug(f'After _list_head is_empty, {call_time}')
462+
logger.debug(f'Finish _list_head is_empty, {call_time}')
463+
return len(head.items) == 0 and not self._queue_has_locked_requests
444464

445465
async def _ensure_head_is_non_empty(self) -> None:
446466
"""Ensure that the queue head has requests if they are available in the queue."""
@@ -551,8 +571,9 @@ async def _list_head(
551571
A collection of requests from the beginning of the queue.
552572
"""
553573
# Return from cache if available and we're not checking for new forefront requests
574+
call_time = next(COUNTER)
554575
if self._queue_head and not self._should_check_for_forefront_requests:
555-
logger.debug(f'Using cached queue head with {len(self._queue_head)} requests')
576+
logger.debug(f'Using cached queue head with {len(self._queue_head)} requests, {call_time}')
556577

557578
# Create a list of requests from the cached queue head
558579
items = []
@@ -571,7 +592,7 @@ async def _list_head(
571592
queue_has_locked_requests=self._queue_has_locked_requests,
572593
lock_time=lock_time,
573594
)
574-
595+
logger.debug(f'Updating cached queue head with {len(self._queue_head)} requests, {call_time}')
575596
leftover_buffer = list[str]()
576597
if self._should_check_for_forefront_requests:
577598
leftover_buffer = list(self._queue_head)
@@ -615,13 +636,14 @@ async def _list_head(
615636
),
616637
hydrated_request=request,
617638
)
618-
639+
logger.debug(f'Adding to head, {call_time}')
619640
self._queue_head.append(request.id)
641+
logger.debug(f'Cached queue head with {len(self._queue_head)} requests, {call_time}')
620642

621643
for leftover_request_id in leftover_buffer:
622644
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
623645
self._queue_head.append(leftover_request_id)
624-
646+
logger.debug(f'Cached queue head with {len(self._queue_head)} requests, {call_time}')
625647
return RequestQueueHead.model_validate(response)
626648

627649
async def _prolong_request_lock(

tests/integration/test_actor_request_queue.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,40 @@ async def test_request_queue_is_finished(
9898
request_queue_name = generate_unique_resource_name('request_queue')
9999

100100
async with Actor:
101-
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
102-
await request_queue.add_request(Request.from_url('http://example.com'))
103-
assert not await request_queue.is_finished()
101+
try:
102+
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
103+
await request_queue.add_request(Request.from_url('http://example.com'))
104+
assert not await request_queue.is_finished()
105+
106+
request = await request_queue.fetch_next_request()
107+
assert request is not None
108+
assert not await request_queue.is_finished(), (
109+
'RequestQueue should not be finished unless the request is marked as handled.'
110+
)
111+
112+
await request_queue.mark_request_as_handled(request)
113+
assert await request_queue.is_finished()
114+
finally:
115+
await request_queue.drop()
116+
104117

105-
request = await request_queue.fetch_next_request()
106-
assert request is not None
107-
assert not await request_queue.is_finished(), (
108-
'RequestQueue should not be finished unless the request is marked as handled.'
109-
)
118+
async def test_same_request_fetched_twice(
119+
apify_client_async: ApifyClientAsync,
120+
monkeypatch: pytest.MonkeyPatch):
121+
"""Test that the same request can be fetched twice from the request queue."""
122+
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token)
110123

111-
await request_queue.mark_request_as_handled(request)
112-
assert await request_queue.is_finished()
124+
request_queue_name = generate_unique_resource_name('request_queue')
125+
async with Actor:
126+
try:
127+
request_queue = await Actor.open_request_queue(name='same-request-fetch', force_cloud=request_queue_name)
128+
129+
request = Request.from_url('http://example.com')
130+
await request_queue.add_request(request)
131+
132+
fetched_request_1 = await request_queue.fetch_next_request()
133+
assert fetched_request_1 is not None
134+
assert fetched_request_1.url == 'http://example.com'
135+
await request_queue.reclaim_request(fetched_request_1)
136+
finally:
137+
await request_queue.drop()

tests/integration/test_crawlers_with_storages.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from typing import TYPE_CHECKING
44

5+
import pytest
6+
57
if TYPE_CHECKING:
68
from .conftest import MakeActorFunction, RunActorFunction
79

@@ -76,19 +78,23 @@ async def default_handler(context: ParselCrawlingContext) -> None:
7678
assert run_result.status == 'SUCCEEDED'
7779

7880

79-
async def test_actor_on_platform_max_request_retries(
81+
@pytest.mark.parametrize('_', range(10))
82+
async def test_actor_on_platform_max_request_retries(_,
8083
make_actor: MakeActorFunction,
8184
run_actor: RunActorFunction,
8285
) -> None:
8386
"""Test that the actor respects max_request_retries."""
8487

8588
async def main() -> None:
8689
"""The crawler entry point."""
90+
import logging
91+
8792
from crawlee.crawlers import BasicCrawlingContext, ParselCrawler, ParselCrawlingContext
8893

8994
from apify import Actor
9095

9196
async with Actor:
97+
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
9298
max_retries = 3
9399
crawler = ParselCrawler(max_request_retries=max_retries)
94100
failed_counter = 0

0 commit comments

Comments
 (0)