Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
5c437c9
Rm old Apify storage clients
vdusek Apr 28, 2025
bf55338
Add init version of new Apify storage clients
vdusek May 9, 2025
6b2f82b
Move specific models from Crawlee to SDK
vdusek Jun 12, 2025
38bef68
Adapt to Crawlee v1
vdusek Jun 18, 2025
1f85430
Adapt to Crawlee v1 (p2)
vdusek Jun 23, 2025
a3d68a2
Fix default storage IDs
vdusek Jun 25, 2025
c77e8d5
Fix integration test and Not implemented exception in purge
vdusek Jun 26, 2025
8731aff
Fix unit tests
vdusek Jun 26, 2025
8dfaffb
fix lint
vdusek Jun 26, 2025
53fad07
add KVS record_exists not implemented
vdusek Jun 26, 2025
5869f8e
update to apify client 1.12 and implement record exists
vdusek Jun 26, 2025
82e65fc
Move default storage IDs to Configuration
vdusek Jun 27, 2025
8de950b
opening storages get default id from config
vdusek Jun 27, 2025
98b76c5
Addressing more feedback
vdusek Jun 27, 2025
7b5ee07
Fixing integration test test_push_large_data_chunks_over_9mb
vdusek Jun 27, 2025
afcb8c7
Abstract open method is removed from storage clients
vdusek Jun 30, 2025
3bacab7
fixing generate public url for KVS records
vdusek Jun 30, 2025
287a119
add async metadata getters
vdusek Jul 1, 2025
e45d65b
Merge branch 'master' into new-apify-storage-clients
vdusek Jul 1, 2025
51178ca
better usage of apify config
vdusek Jul 1, 2025
3cd7dfe
renaming
vdusek Jul 2, 2025
6fe9eb3
Merge branch 'master' into new-apify-storage-clients
vdusek Jul 3, 2025
1547cbd
fixes after merge commit
vdusek Jul 3, 2025
bb47efc
Merge branch 'master' into new-apify-storage-clients
vdusek Jul 4, 2025
4e4fa93
Change from orphan commit to master in crawlee version
Pijukatel Jul 9, 2025
683cb31
Merge branch 'master' into new-apify-storage-clients
vdusek Jul 9, 2025
e5b2bc4
fix encrypted secrets test
vdusek Jul 9, 2025
638756f
Add Apify's version of FS client that keeps the INPUT json
vdusek Jul 10, 2025
931b0ca
update metadata fixes
vdusek Jul 16, 2025
ad7c0d8
Merge branch 'master' into new-apify-storage-clients
vdusek Jul 16, 2025
1f3c481
KVS metadata extended model
vdusek Jul 16, 2025
44d8e09
fix url signing secret key
vdusek Jul 16, 2025
ca72313
Apify storage client fixes and new docs groups
vdusek Jul 19, 2025
bc61fee
Add test for `RequestQueue.is_finished`
Pijukatel Jul 21, 2025
16b76dd
Check `_queue_has_locked_requests` in `is_empty`
Pijukatel Jul 21, 2025
b6e8a5f
Merge branch 'master' into new-apify-storage-clients
vdusek Jul 22, 2025
a3f8c6e
Package structure update
vdusek Jul 22, 2025
594a8e5
Fix request list (HttpResponse.read is now async)
vdusek Jul 22, 2025
e1afe2d
init upgrading guide to v3
vdusek Jul 24, 2025
8ce6902
addres RQ feedback from Pepa
vdusek Jul 25, 2025
42810f0
minor RQ client update
vdusek Jul 25, 2025
9edac0f
Merge branch 'master' into new-apify-storage-clients
vdusek Jul 28, 2025
ec2a9f0
Fix 2 tests in RQ Apify storage client
vdusek Jul 29, 2025
f82d110
Merge branch 'master' into new-apify-storage-clients
vdusek Jul 30, 2025
71ac38d
Update request queue to use manual request tracking
vdusek Aug 3, 2025
a8881dd
httpx vs impit
vdusek Aug 3, 2025
f5189c5
Merge branch 'master' into new-apify-storage-clients
vdusek Aug 5, 2025
89e572e
rm broken crawlers integration tests
vdusek Aug 5, 2025
ae3044e
Try to patch the integration tests for the crawlee branch
Pijukatel Aug 5, 2025
4bc5c91
Add deduplication and test
Pijukatel Aug 5, 2025
70908b3
Add logging for debug
Pijukatel Aug 6, 2025
91ff3fd
Format and type check
Pijukatel Aug 6, 2025
03dcb15
Keep only relevant log
Pijukatel Aug 7, 2025
65b297a
Update to handle parallel requests with same links
Pijukatel Aug 7, 2025
079f890
Merge remote-tracking branch 'origin/master' into add-deduplication
Pijukatel Aug 13, 2025
2c3d0ce
Handle unprocessed requests in deduplication cache correctly
Pijukatel Aug 13, 2025
329baed
Adress review comments
Pijukatel Aug 15, 2025
978d49e
Add deduplication test for `use_extended_unique_key` requests
Pijukatel Aug 15, 2025
1b92532
Do early response validation
Pijukatel Aug 15, 2025
cfdb1e2
Merge remote-tracking branch 'origin/master' into add-deduplication
Pijukatel Aug 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 67 additions & 10 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,20 +242,77 @@ async def add_batch_of_requests(
Returns:
Response containing information about the added requests.
"""
# Prepare requests for API by converting to dictionaries.
requests_dict = [
request.model_dump(
by_alias=True,
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them.
# Do not try to add previously added requests to avoid pointless expensive calls to API

new_requests: list[Request] = []
already_present_requests: list[ProcessedRequest] = []

for request in requests:
if self._requests_cache.get(request.id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Judging by apify/crawlee#3120, a day may come when we try to limit the size of _requests_cache somehow. Perhaps we should think ahead and come up with a more space-efficient way of tracking already added requests?

EDIT: hollup a minute, do you use the ID here for deduplication instead of unique key?

Copy link
Contributor Author

@Pijukatel Pijukatel Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is this deterministic transformation function unique_key_to_request_id, which respects Apify platform way of creating IDs, this seems ok. If someone starts creating Requests with a custom id, then deduplication will most likely stop working.

There are two issues I created based on the discussion about this:

# We are not sure if it was already handled at this point, and it is not worth calling API for it.
# It could have been handled by another client in the meantime, so cached information about
# `request.was_already_handled` is not reliable.
already_present_requests.append(
ProcessedRequest.model_validate(
{
'id': request.id,
'uniqueKey': request.unique_key,
'wasAlreadyPresent': True,
'wasAlreadyHandled': request.was_already_handled,
}
)
)

else:
# Add new request to the cache.
processed_request = ProcessedRequest.model_validate(
{
'id': request.id,
'uniqueKey': request.unique_key,
'wasAlreadyPresent': True,
'wasAlreadyHandled': request.was_already_handled,
}
)
self._cache_request(
unique_key_to_request_id(request.unique_key),
processed_request,
)
new_requests.append(request)

if new_requests:
# Prepare requests for API by converting to dictionaries.
requests_dict = [
request.model_dump(
by_alias=True,
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them.
)
for request in new_requests
]

# Send requests to API.
api_response = AddRequestsResponse.model_validate(
await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)

# Remove unprocessed requests from the cache
for unprocessed_request in api_response.unprocessed_requests:
self._requests_cache.pop(unique_key_to_request_id(unprocessed_request.unique_key), None)

else:
api_response = AddRequestsResponse.model_validate(
{'unprocessedRequests': [], 'processedRequests': already_present_requests}
)
for request in requests
]

# Send requests to API.
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
logger.debug(
f'Tried to add new requests: {len(new_requests)}, '
f'succeeded to add new requests: {len(api_response.processed_requests) - len(already_present_requests)}, '
f'skipped already present requests: {len(already_present_requests)}'
)

# Update assumed total count for newly added requests.
api_response = AddRequestsResponse.model_validate(response)
new_request_count = 0
for processed_request in api_response.processed_requests:
if not processed_request.was_already_present and not processed_request.was_already_handled:
Expand Down
Loading
Loading