Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/apify/apify_storage_client/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import TYPE_CHECKING

from more_itertools import chunked
from typing_extensions import override

from crawlee import Request
Expand Down Expand Up @@ -157,8 +158,11 @@ async def batch_add_requests(
*,
forefront: bool = False,
) -> BatchRequestsOperationResponse:
return BatchRequestsOperationResponse.model_validate(
await self._client.batch_add_requests(
processed = []
unprocessed = []

for chunk in chunked(requests, 25): # The API endpoint won't accept more than 25 requests at once
response = await self._client.batch_add_requests(
requests=[
r.model_dump(
by_alias=True,
Expand All @@ -170,10 +174,18 @@ async def batch_add_requests(
'data',
},
)
for r in requests
for r in chunk
],
forefront=forefront,
)
processed.extend(response['processedRequests'])
unprocessed.extend(response['unprocessedRequests'])

return BatchRequestsOperationResponse.model_validate(
{
'processedRequests': processed,
'unprocessedRequests': unprocessed,
}
)

@override
Expand Down
65 changes: 65 additions & 0 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,68 @@ async def main() -> None:
run_result = await actor.call()
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'

async def test_batch(self, make_actor: ActorFactory) -> None:
async def main() -> None:
async with Actor:
desired_request_count = 100
print('Opening request queue...')
# I have seen it get stuck on this call
rq = await Actor.open_request_queue()
# Add some requests
await rq.add_requests_batched([f'https://example.com/{i}' for i in range(desired_request_count)])

handled_request_count = 0
while next_request := await rq.fetch_next_request():
print('Fetching next request...')
queue_operation_info = await rq.mark_request_as_handled(next_request)
assert queue_operation_info is not None
assert queue_operation_info.was_already_handled is False
handled_request_count += 1

assert handled_request_count == desired_request_count
print('Waiting for queue to be finished...')
is_finished = await rq.is_finished()
assert is_finished is True

actor = await make_actor('rq-batch-test', main_func=main)

run_result = await actor.call()
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'

async def test_batch_non_unique(self, make_actor: ActorFactory) -> None:
async def main() -> None:
from crawlee import Request

async with Actor:
desired_request_count = 100
print('Opening request queue...')
# I have seen it get stuck on this call
rq = await Actor.open_request_queue()
# Add some requests
await rq.add_requests_batched(
[
Request.from_url(f'https://example.com/{i}', unique_key=str(i - 1 if i % 4 == 1 else i))
for i in range(desired_request_count)
]
)

handled_request_count = 0
while next_request := await rq.fetch_next_request():
print('Fetching next request...')
queue_operation_info = await rq.mark_request_as_handled(next_request)
assert queue_operation_info is not None
assert queue_operation_info.was_already_handled is False
handled_request_count += 1

assert handled_request_count == desired_request_count * 3 / 4
print('Waiting for queue to be finished...')
is_finished = await rq.is_finished()
assert is_finished is True

actor = await make_actor('rq-batch-test', main_func=main)

run_result = await actor.call()
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'