Skip to content

Commit 5dda869

Browse files
committed
first naive implementation with semaphore
1 parent ae0ea6f commit 5dda869

File tree

1 file changed

+69
-13
lines changed

1 file changed

+69
-13
lines changed

src/apify_client/clients/resource_clients/request_queue.py

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
from __future__ import annotations
22

3+
import asyncio
4+
import math
5+
from datetime import timedelta
36
from typing import Any
47

58
from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields
9+
from httpx import Response
610

711
from apify_client._errors import ApifyApiError
812
from apify_client._utils import catch_not_found_or_throw, pluck_data
913
from apify_client.clients.base import ResourceClient, ResourceClientAsync
1014

15+
_RQ_MAX_REQUESTS_PER_BATCH = 25
16+
_MAX_PAYLOAD_SIZE_BYTES = 9 * 1024 * 1024 # 9 MB
17+
_SAFETY_BUFFER_PERCENT = 0.01 / 100 # 0.01%
18+
1119

1220
class RequestQueueClient(ResourceClient):
1321
"""Sub-client for manipulating a single request queue."""
@@ -244,15 +252,19 @@ def batch_add_requests(
244252
requests: list[dict],
245253
*,
246254
forefront: bool | None = None,
255+
max_unprocessed_requests_retries: int = 3,
256+
max_parallel: int = 5,
257+
min_delay_between_unprocessed_requests_retries: timedelta = timedelta(milliseconds=500),
247258
) -> dict:
248259
"""Add requests to the queue.
249260
250261
https://docs.apify.com/api/v2#/reference/request-queues/batch-request-operations/add-requests
251262
252263
Args:
253-
requests (list[dict]): list of the requests to add
254-
forefront (bool, optional): Whether to add the requests to the head or the end of the queue
264+
requests: list of the requests to add
265+
forefront: Whether to add the requests to the head or the end of the queue
255266
"""
267+
# TODO
256268
request_params = self._params(clientKey=self.client_key, forefront=forefront)
257269

258270
response = self.http_client.call(
@@ -540,29 +552,73 @@ async def delete_request_lock(
540552
params=request_params,
541553
)
542554

555+
async def _batch_add_requests_inner(
556+
self,
557+
semaphore: asyncio.Semaphore,
558+
request_params: dict,
559+
batch: list[dict],
560+
) -> Response:
561+
async with semaphore:
562+
return await self.http_client.call(
563+
url=self._url('requests/batch'),
564+
method='POST',
565+
params=request_params,
566+
json=batch,
567+
)
568+
543569
async def batch_add_requests(
544570
self: RequestQueueClientAsync,
545571
requests: list[dict],
546572
*,
547-
forefront: bool | None = None,
548-
) -> dict:
573+
forefront: bool = False,
574+
max_unprocessed_requests_retries: int = 3,
575+
max_parallel: int = 5,
576+
min_delay_between_unprocessed_requests_retries: timedelta = timedelta(milliseconds=500),
577+
) -> list[dict]:
549578
"""Add requests to the queue.
550579
551580
https://docs.apify.com/api/v2#/reference/request-queues/batch-request-operations/add-requests
552581
553582
Args:
554-
requests (list[dict]): list of the requests to add
555-
forefront (bool, optional): Whether to add the requests to the head or the end of the queue
583+
requests: List of requests to add.
584+
forefront: Whether to add the requests to the head or the end of the queue.
585+
max_unprocessed_requests_retries: Number of retries for unprocessed requests.
586+
max_parallel: Maximum number of parallel operations.
587+
min_delay_between_unprocessed_requests_retries: Minimum delay between retries for unprocessed requests.
556588
"""
589+
payload_size_limit_bytes = _MAX_PAYLOAD_SIZE_BYTES - math.ceil(_MAX_PAYLOAD_SIZE_BYTES * _SAFETY_BUFFER_PERCENT)
590+
591+
tasks = set[asyncio.Task]()
592+
593+
responses = list[dict]()
594+
557595
request_params = self._params(clientKey=self.client_key, forefront=forefront)
558596

559-
response = await self.http_client.call(
560-
url=self._url('requests/batch'),
561-
method='POST',
562-
params=request_params,
563-
json=requests,
564-
)
565-
return parse_date_fields(pluck_data(response.json()))
597+
semaphore = asyncio.Semaphore(max_parallel)
598+
599+
number_of_iterations = math.ceil(len(requests) / _RQ_MAX_REQUESTS_PER_BATCH)
600+
601+
for i in range(number_of_iterations):
602+
start = i * _RQ_MAX_REQUESTS_PER_BATCH
603+
end = (i + 1) * _RQ_MAX_REQUESTS_PER_BATCH
604+
batch = requests[start:end]
605+
606+
task = asyncio.create_task(
607+
coro=self._batch_add_requests_inner(
608+
semaphore=semaphore,
609+
request_params=request_params,
610+
batch=batch,
611+
),
612+
name=f'batch_add_requests_{i}',
613+
)
614+
615+
tasks.add(task)
616+
task.add_done_callback(lambda response: responses.append(response.result().json()))
617+
task.add_done_callback(lambda _: tasks.remove(task))
618+
619+
asyncio.gather(*tasks)
620+
621+
return [parse_date_fields(pluck_data(response)) for response in responses]
566622

567623
async def batch_delete_requests(self: RequestQueueClientAsync, requests: list[dict]) -> dict:
568624
"""Delete given requests from the queue.

0 commit comments

Comments
 (0)