Skip to content

Commit 1b92532

Browse files
committed
Do early response validation
1 parent 978d49e commit 1b92532

File tree

2 files changed

+26
-15
lines changed

2 files changed

+26
-15
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -245,20 +245,22 @@ async def add_batch_of_requests(
245245
# Do not try to add previously added requests to avoid pointless expensive calls to API
246246

247247
new_requests: list[Request] = []
248-
already_present_requests: list[dict[str, str | bool]] = []
248+
already_present_requests: list[ProcessedRequest] = []
249249

250250
for request in requests:
251251
if self._requests_cache.get(request.id):
252252
# We are not sure if it was already handled at this point, and it is not worth calling API for it.
253253
# It could have been handled by another client in the meantime, so cached information about
254254
# `request.was_already_handled` is not reliable.
255255
already_present_requests.append(
256-
{
257-
'id': request.id,
258-
'uniqueKey': request.unique_key,
259-
'wasAlreadyPresent': True,
260-
'wasAlreadyHandled': request.was_already_handled,
261-
}
256+
ProcessedRequest.model_validate(
257+
{
258+
'id': request.id,
259+
'uniqueKey': request.unique_key,
260+
'wasAlreadyPresent': True,
261+
'wasAlreadyHandled': request.was_already_handled,
262+
}
263+
)
262264
)
263265

264266
else:
@@ -288,25 +290,29 @@ async def add_batch_of_requests(
288290
]
289291

290292
# Send requests to API.
291-
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
293+
api_response = AddRequestsResponse.model_validate(
294+
await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
295+
)
296+
292297
# Add the locally known already present processed requests based on the local cache.
293-
response['processedRequests'].extend(already_present_requests)
298+
api_response.processed_requests.extend(already_present_requests)
294299

295300
# Remove unprocessed requests from the cache
296-
for unprocessed in response['unprocessedRequests']:
297-
self._requests_cache.pop(unique_key_to_request_id(unprocessed['uniqueKey']), None)
301+
for unprocessed_request in api_response.unprocessed_requests:
302+
self._requests_cache.pop(unique_key_to_request_id(unprocessed_request.unique_key), None)
298303

299304
else:
300-
response = {'unprocessedRequests': [], 'processedRequests': already_present_requests}
305+
api_response = AddRequestsResponse.model_validate(
306+
{'unprocessedRequests': [], 'processedRequests': already_present_requests}
307+
)
301308

302309
logger.debug(
303310
f'Tried to add new requests: {len(new_requests)}, '
304-
f'succeeded to add new requests: {len(response["processedRequests"]) - len(already_present_requests)}, '
311+
f'succeeded to add new requests: {len(api_response.processed_requests) - len(already_present_requests)}, '
305312
f'skipped already present requests: {len(already_present_requests)}'
306313
)
307314

308315
# Update assumed total count for newly added requests.
309-
api_response = AddRequestsResponse.model_validate(response)
310316
new_request_count = 0
311317
for processed_request in api_response.processed_requests:
312318
if not processed_request.was_already_present and not processed_request.was_already_handled:

tests/integration/test_actor_request_queue.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,12 @@ async def add_requests_worker() -> None:
273273
async def test_request_queue_deduplication_unprocessed_requests(
274274
apify_named_rq: RequestQueue,
275275
) -> None:
276-
"""Test that the deduplication does not add unprocessed requests to the cache."""
276+
"""Test that the deduplication does not add unprocessed requests to the cache.
277+
278+
In this test the first call is "hardcoded" to fail, even on all retries, so it never even sends the API request and
279+
thus has no chance of increasing the `writeCount`. The second call can increase the `writeCount` only if it is not
280+
cached, as cached requests do not make the call (tested in other tests). So this means the `unprocessedRequests`
281+
request was intentionally not cached."""
277282
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
278283

279284
await asyncio.sleep(10) # Wait to be sure that metadata are updated

0 commit comments

Comments
 (0)