Skip to content

Commit 61d7a39

Browse files
authored
fix: Hotfix for batch_add_requests batch size limit (#261)
1 parent 9e13680 commit 61d7a39

File tree

2 files changed

+80
-3
lines changed

2 files changed

+80
-3
lines changed

src/apify/apify_storage_client/_request_queue_client.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from typing import TYPE_CHECKING
44

5+
from more_itertools import chunked
56
from typing_extensions import override
67

78
from crawlee import Request
@@ -157,8 +158,11 @@ async def batch_add_requests(
157158
*,
158159
forefront: bool = False,
159160
) -> BatchRequestsOperationResponse:
160-
return BatchRequestsOperationResponse.model_validate(
161-
await self._client.batch_add_requests(
161+
processed = []
162+
unprocessed = []
163+
164+
for chunk in chunked(requests, 25): # The API endpoint won't accept more than 25 requests at once
165+
response = await self._client.batch_add_requests(
162166
requests=[
163167
r.model_dump(
164168
by_alias=True,
@@ -170,10 +174,18 @@ async def batch_add_requests(
170174
'data',
171175
},
172176
)
173-
for r in requests
177+
for r in chunk
174178
],
175179
forefront=forefront,
176180
)
181+
processed.extend(response['processedRequests'])
182+
unprocessed.extend(response['unprocessedRequests'])
183+
184+
return BatchRequestsOperationResponse.model_validate(
185+
{
186+
'processedRequests': processed,
187+
'unprocessedRequests': unprocessed,
188+
}
177189
)
178190

179191
@override

tests/integration/test_request_queue.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,68 @@ async def main() -> None:
3939
run_result = await actor.call()
4040
assert run_result is not None
4141
assert run_result['status'] == 'SUCCEEDED'
42+
43+
async def test_batch(self, make_actor: ActorFactory) -> None:
44+
async def main() -> None:
45+
async with Actor:
46+
desired_request_count = 100
47+
print('Opening request queue...')
48+
# I have seen it get stuck on this call
49+
rq = await Actor.open_request_queue()
50+
# Add some requests
51+
await rq.add_requests_batched([f'https://example.com/{i}' for i in range(desired_request_count)])
52+
53+
handled_request_count = 0
54+
while next_request := await rq.fetch_next_request():
55+
print('Fetching next request...')
56+
queue_operation_info = await rq.mark_request_as_handled(next_request)
57+
assert queue_operation_info is not None
58+
assert queue_operation_info.was_already_handled is False
59+
handled_request_count += 1
60+
61+
assert handled_request_count == desired_request_count
62+
print('Waiting for queue to be finished...')
63+
is_finished = await rq.is_finished()
64+
assert is_finished is True
65+
66+
actor = await make_actor('rq-batch-test', main_func=main)
67+
68+
run_result = await actor.call()
69+
assert run_result is not None
70+
assert run_result['status'] == 'SUCCEEDED'
71+
72+
async def test_batch_non_unique(self, make_actor: ActorFactory) -> None:
73+
async def main() -> None:
74+
from crawlee import Request
75+
76+
async with Actor:
77+
desired_request_count = 100
78+
print('Opening request queue...')
79+
# I have seen it get stuck on this call
80+
rq = await Actor.open_request_queue()
81+
# Add some requests
82+
await rq.add_requests_batched(
83+
[
84+
Request.from_url(f'https://example.com/{i}', unique_key=str(i - 1 if i % 4 == 1 else i))
85+
for i in range(desired_request_count)
86+
]
87+
)
88+
89+
handled_request_count = 0
90+
while next_request := await rq.fetch_next_request():
91+
print('Fetching next request...')
92+
queue_operation_info = await rq.mark_request_as_handled(next_request)
93+
assert queue_operation_info is not None
94+
assert queue_operation_info.was_already_handled is False
95+
handled_request_count += 1
96+
97+
assert handled_request_count == desired_request_count * 3 / 4
98+
print('Waiting for queue to be finished...')
99+
is_finished = await rq.is_finished()
100+
assert is_finished is True
101+
102+
actor = await make_actor('rq-batch-test', main_func=main)
103+
104+
run_result = await actor.call()
105+
assert run_result is not None
106+
assert run_result['status'] == 'SUCCEEDED'

0 commit comments

Comments
 (0)