Skip to content

Commit 059d5ce

Browse files
committed
Remove unnecessary complexity
1 parent 4495516 commit 059d5ce

File tree

2 files changed

+32
-43
lines changed

2 files changed

+32
-43
lines changed

src/apify_client/clients/resource_clients/request_queue.py

Lines changed: 22 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,6 @@ class BatchAddRequestsResult(TypedDict):
3636
unprocessedRequests: list[dict]
3737

3838

39-
def _get_unprocessed_request_from_request(request: dict[str, str]) -> dict[str, str]:
40-
relevant_keys = {'url', 'uniqueKey', 'method'}
41-
return {key: value for key, value in request.items() if key in relevant_keys}
42-
43-
4439
class RequestQueueClient(ResourceClient):
4540
"""Sub-client for manipulating a single request queue."""
4641

@@ -323,14 +318,11 @@ def batch_add_requests(
323318
queue.put(batch)
324319

325320
processed_requests = list[dict]()
326-
unprocessed_requests = dict[str, dict]()
321+
unprocessed_requests = list[dict]()
327322

328323
# Process all batches in the queue sequentially.
329324
while not queue.empty():
330325
request_batch = queue.get()
331-
# All requests are considered unprocessed unless explicitly mentioned in `processedRequests` response.
332-
for request in request_batch:
333-
unprocessed_requests[request['uniqueKey']] = _get_unprocessed_request_from_request(request)
334326

335327
# Send the batch to the API.
336328
response = self.http_client.call(
@@ -343,13 +335,11 @@ def batch_add_requests(
343335

344336
response_parsed = parse_date_fields(pluck_data(response.json()))
345337
processed_requests.extend(response_parsed.get('processedRequests', []))
346-
347-
for processed_request in response_parsed.get('processedRequests', []):
348-
unprocessed_requests.pop(processed_request['uniqueKey'], None)
338+
unprocessed_requests.extend(response_parsed.get('unprocessedRequests', []))
349339

350340
return {
351341
'processedRequests': processed_requests,
352-
'unprocessedRequests': list(unprocessed_requests.values()),
342+
'unprocessedRequests': unprocessed_requests,
353343
}
354344

355345
def batch_delete_requests(self, requests: list[dict]) -> dict:
@@ -650,39 +640,36 @@ async def _batch_add_requests_worker(
650640
Return result containing lists of processed and unprocessed requests by the worker.
651641
"""
652642
processed_requests = list[dict]()
653-
unprocessed_requests = dict[str, dict]()
643+
unprocessed_requests = list[dict]()
654644

655645
while True:
656646
# Get the next batch from the queue.
657647
try:
658648
request_batch = await queue.get()
659-
# All requests are considered unprocessed unless explicitly mentioned in `processedRequests` response.
660-
for request in request_batch:
661-
unprocessed_requests[request['uniqueKey']] = _get_unprocessed_request_from_request(request)
662-
663649
except asyncio.CancelledError:
664650
break
665651

666-
# Send the batch to the API.
667-
response = await self.http_client.call(
668-
url=self._url('requests/batch'),
669-
method='POST',
670-
params=request_params,
671-
json=list(request_batch),
672-
timeout_secs=_MEDIUM_TIMEOUT,
673-
)
674-
675-
response_parsed = parse_date_fields(pluck_data(response.json()))
676-
processed_requests.extend(response_parsed.get('processedRequests', []))
677-
678-
for processed_request in response_parsed.get('processedRequests', []):
679-
unprocessed_requests.pop(processed_request['uniqueKey'], None)
680-
681-
queue.task_done()
652+
try:
653+
# Send the batch to the API.
654+
response = await self.http_client.call(
655+
url=self._url('requests/batch'),
656+
method='POST',
657+
params=request_params,
658+
json=list(request_batch),
659+
timeout_secs=_MEDIUM_TIMEOUT,
660+
)
661+
662+
response_parsed = parse_date_fields(pluck_data(response.json()))
663+
processed_requests.extend(response_parsed.get('processedRequests', []))
664+
unprocessed_requests.extend(response_parsed.get('unprocessedRequests', []))
665+
666+
finally:
667+
# Mark the batch as done whether it succeeded or failed.
668+
queue.task_done()
682669

683670
return {
684671
'processedRequests': processed_requests,
685-
'unprocessedRequests': list(unprocessed_requests.values()),
672+
'unprocessedRequests': unprocessed_requests,
686673
}
687674

688675
async def batch_add_requests(

tests/unit/test_client_request_queue.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import pytest
12
import respx
23

4+
import apify_client
35
from apify_client import ApifyClient, ApifyClientAsync
46

57
_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT = """{
@@ -24,8 +26,8 @@
2426

2527

2628
@respx.mock
27-
async def test_batch_not_processed_due_to_exception_async() -> None:
28-
"""Test that all requests are unprocessed unless explicitly stated by the server that they have been processed."""
29+
async def test_batch_not_processed_raises_exception_async() -> None:
30+
"""Test that client exceptions are not silently ignored"""
2931
client = ApifyClientAsync(token='')
3032

3133
respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401))
@@ -35,8 +37,8 @@ async def test_batch_not_processed_due_to_exception_async() -> None:
3537
]
3638
rq_client = client.request_queue(request_queue_id='whatever')
3739

38-
response = await rq_client.batch_add_requests(requests=requests)
39-
assert response['unprocessedRequests'] == requests
40+
with pytest.raises(apify_client._errors.ApifyApiError):
41+
await rq_client.batch_add_requests(requests=requests)
4042

4143

4244
@respx.mock
@@ -58,8 +60,8 @@ async def test_batch_processed_partially_async() -> None:
5860

5961

6062
@respx.mock
61-
def test_batch_not_processed_due_to_exception_sync() -> None:
62-
"""Test that all requests are unprocessed unless explicitly stated by the server that they have been processed."""
63+
def test_batch_not_processed_raises_exception_sync() -> None:
64+
"""Test that client exceptions are not silently ignored"""
6365
client = ApifyClient(token='')
6466

6567
respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401))
@@ -69,8 +71,8 @@ def test_batch_not_processed_due_to_exception_sync() -> None:
6971
]
7072
rq_client = client.request_queue(request_queue_id='whatever')
7173

72-
response = rq_client.batch_add_requests(requests=requests)
73-
assert response['unprocessedRequests'] == requests
74+
with pytest.raises(apify_client._errors.ApifyApiError):
75+
rq_client.batch_add_requests(requests=requests)
7476

7577

7678
@respx.mock

0 commit comments

Comments
 (0)