Skip to content

Commit 2c3d0ce

Browse files
committed
Handle unprocessed requests in deduplication cache correctly
1 parent 079f890 commit 2c3d0ce

File tree

2 files changed

+94
-46
lines changed

2 files changed

+94
-46
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,17 @@ async def add_batch_of_requests(
289289
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
290290
# Add the locally known already present processed requests based on the local cache.
291291
response['processedRequests'].extend(already_present_requests)
292+
293+
# Remove unprocessed requests from the cache
294+
for unprocessed in response['unprocessedRequests']:
295+
self._requests_cache.pop(unique_key_to_request_id(unprocessed['uniqueKey']), None)
296+
292297
else:
293298
response = {'unprocessedRequests': [], 'processedRequests': already_present_requests}
294299

295300
logger.debug(
296-
f'Added new requests: {len(new_requests)}, '
301+
f'Tried to add new requests: {len(new_requests)}, '
302+
f'succeeded to add new requests: {len(response["processedRequests"])}, '
297303
f'skipped already present requests: {len(already_present_requests)}'
298304
)
299305

tests/integration/test_actor_request_queue.py

Lines changed: 87 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,40 @@
11
from __future__ import annotations
22

3-
from typing import TYPE_CHECKING
3+
import asyncio
4+
import logging
5+
from typing import TYPE_CHECKING, Any
6+
from unittest import mock
7+
8+
import pytest
49

510
from apify_shared.consts import ApifyEnvVars
611

712
from ._utils import generate_unique_resource_name
813
from apify import Actor, Request
914

1015
if TYPE_CHECKING:
11-
import pytest
16+
from collections.abc import AsyncGenerator
1217

1318
from apify_client import ApifyClientAsync
19+
from crawlee.storages import RequestQueue
1420

1521
from .conftest import MakeActorFunction, RunActorFunction
1622

1723

24+
@pytest.fixture
25+
async def apify_named_rq(
26+
apify_client_async: ApifyClientAsync, monkeypatch: pytest.MonkeyPatch
27+
) -> AsyncGenerator[RequestQueue]:
28+
assert apify_client_async.token
29+
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token)
30+
request_queue_name = generate_unique_resource_name('request_queue')
31+
32+
async with Actor:
33+
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
34+
yield request_queue
35+
await request_queue.drop()
36+
37+
1838
async def test_same_references_in_default_rq(
1939
make_actor: MakeActorFunction,
2040
run_actor: RunActorFunction,
@@ -61,58 +81,36 @@ async def main() -> None:
6181

6282
async def test_force_cloud(
6383
apify_client_async: ApifyClientAsync,
64-
monkeypatch: pytest.MonkeyPatch,
84+
apify_named_rq: RequestQueue,
6585
) -> None:
66-
assert apify_client_async.token is not None
67-
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token)
68-
69-
request_queue_name = generate_unique_resource_name('request_queue')
70-
71-
async with Actor:
72-
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
73-
request_queue_id = (await request_queue.get_metadata()).id
74-
75-
request_info = await request_queue.add_request(Request.from_url('http://example.com'))
76-
86+
request_queue_id = (await apify_named_rq.get_metadata()).id
87+
request_info = await apify_named_rq.add_request(Request.from_url('http://example.com'))
7788
request_queue_client = apify_client_async.request_queue(request_queue_id)
7889

79-
try:
80-
request_queue_details = await request_queue_client.get()
81-
assert request_queue_details is not None
82-
assert request_queue_details.get('name') == request_queue_name
90+
request_queue_details = await request_queue_client.get()
91+
assert request_queue_details is not None
92+
assert request_queue_details.get('name') == apify_named_rq.name
8393

84-
request_queue_request = await request_queue_client.get_request(request_info.id)
85-
assert request_queue_request is not None
86-
assert request_queue_request['url'] == 'http://example.com'
87-
finally:
88-
await request_queue_client.delete()
94+
request_queue_request = await request_queue_client.get_request(request_info.id)
95+
assert request_queue_request is not None
96+
assert request_queue_request['url'] == 'http://example.com'
8997

9098

9199
async def test_request_queue_is_finished(
92-
apify_client_async: ApifyClientAsync,
93-
monkeypatch: pytest.MonkeyPatch,
100+
apify_named_rq: RequestQueue,
94101
) -> None:
95-
assert apify_client_async.token is not None
96-
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token)
97-
98-
request_queue_name = generate_unique_resource_name('request_queue')
99-
100-
async with Actor:
101-
try:
102-
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
103-
await request_queue.add_request(Request.from_url('http://example.com'))
104-
assert not await request_queue.is_finished()
102+
request_queue = await Actor.open_request_queue(name=apify_named_rq.name, force_cloud=True)
103+
await request_queue.add_request(Request.from_url('http://example.com'))
104+
assert not await request_queue.is_finished()
105105

106-
request = await request_queue.fetch_next_request()
107-
assert request is not None
108-
assert not await request_queue.is_finished(), (
109-
'RequestQueue should not be finished unless the request is marked as handled.'
110-
)
106+
request = await request_queue.fetch_next_request()
107+
assert request is not None
108+
assert not await request_queue.is_finished(), (
109+
'RequestQueue should not be finished unless the request is marked as handled.'
110+
)
111111

112-
await request_queue.mark_request_as_handled(request)
113-
assert await request_queue.is_finished()
114-
finally:
115-
await request_queue.drop()
112+
await request_queue.mark_request_as_handled(request)
113+
assert await request_queue.is_finished()
116114

117115

118116
async def test_request_queue_deduplication(
@@ -176,7 +174,7 @@ async def main() -> None:
176174
async with Actor:
177175
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
178176

179-
requests = [Request.from_url(f'http://example.com/{i}') for i in range(1000)]
177+
requests = [Request.from_url(f'http://example.com/{i}') for i in range(100)]
180178
rq = await Actor.open_request_queue()
181179

182180
await asyncio.sleep(10) # Wait to be sure that metadata are updated
@@ -207,3 +205,47 @@ async def add_requests_worker() -> None:
207205
run_result = await run_actor(actor)
208206

209207
assert run_result.status == 'SUCCEEDED'
208+
209+
210+
async def test_request_queue_deduplication_unprocessed_requests(
211+
apify_named_rq: RequestQueue,
212+
) -> None:
213+
"""Test that the deduplication does not add unprocessed requests to the cache."""
214+
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
215+
216+
await asyncio.sleep(10) # Wait to be sure that metadata are updated
217+
218+
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
219+
rq_client = Actor.apify_client.request_queue(request_queue_id=apify_named_rq.id)
220+
_rq = await rq_client.get()
221+
assert _rq
222+
stats_before = _rq.get('stats', {})
223+
Actor.log.info(stats_before)
224+
225+
def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dict[str, list[dict]]:
226+
"""Simulate API returning unprocessed requests."""
227+
return {
228+
'processedRequests': [],
229+
'unprocessedRequests': [
230+
{'url': request['url'], 'uniqueKey': request['uniqueKey'], 'method': request['method']}
231+
for request in requests
232+
],
233+
}
234+
235+
with mock.patch(
236+
'apify_client.clients.resource_clients.request_queue.RequestQueueClientAsync.batch_add_requests',
237+
side_effect=return_unprocessed_requests,
238+
):
239+
# Simulate failed API call for adding requests. Request was not processed and should not be cached.
240+
await apify_named_rq.add_requests(['http://example.com/1'])
241+
242+
# This will succeed.
243+
await apify_named_rq.add_requests(['http://example.com/1'])
244+
245+
await asyncio.sleep(10) # Wait to be sure that metadata are updated
246+
_rq = await rq_client.get()
247+
assert _rq
248+
stats_after = _rq.get('stats', {})
249+
Actor.log.info(stats_after)
250+
251+
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1

0 commit comments

Comments
 (0)