Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ keywords = [
"scraping",
]
dependencies = [
"apify-client>=2.0.0,<3.0.0",
"apify-client@git+https://github.com/apify/apify-client-python.git@remove-request-id",
"apify-shared>=2.0.0,<3.0.0",
"crawlee@git+https://github.com/apify/crawlee-python.git@master",
"crawlee@git+https://github.com/apify/crawlee-python.git@remove-request-id",
"cachetools>=5.5.0",
"cryptography>=42.0.0",
"httpx>=0.27.0",
Expand Down
3 changes: 1 addition & 2 deletions src/apify/scrapy/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ

# Update the meta field with the meta field from the apify_request
meta = scrapy_request.meta or {}
meta.update({'apify_request_id': apify_request.id, 'apify_request_unique_key': apify_request.unique_key})
meta.update({'apify_request_unique_key': apify_request.unique_key})
# scrapy_request.meta is a property, so we have to set it like this
scrapy_request._meta = meta # noqa: SLF001

Expand All @@ -134,7 +134,6 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ
url=apify_request.url,
method=apify_request.method,
meta={
'apify_request_id': apify_request.id,
'apify_request_unique_key': apify_request.unique_key,
},
)
Expand Down
4 changes: 2 additions & 2 deletions src/apify/storage_clients/_apify/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class CachedRequest(BaseModel):
Only internal structure.
"""

id: str
"""The ID of the request."""
unique_key: str
"""Unique key of the request."""

was_already_handled: bool
"""Whether the request was already handled."""
Expand Down
71 changes: 33 additions & 38 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing_extensions import override

from apify_client import ApifyClientAsync
from crawlee._utils.requests import unique_key_to_request_id
from crawlee.storage_clients._base import RequestQueueClient
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

Expand Down Expand Up @@ -59,10 +58,10 @@ def __init__(
"""The name of the request queue."""

self._queue_head = deque[str]()
"""A deque to store request IDs in the queue head."""
"""A deque to store request unique keys in the queue head."""

self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=self._MAX_CACHED_REQUESTS)
"""A cache to store request objects. Request ID is used as the cache key."""
"""A cache to store request objects. Request unique key is used as the cache key."""

self._queue_has_locked_requests: bool | None = None
"""Whether the queue has requests locked by another client."""
Expand Down Expand Up @@ -248,14 +247,13 @@ async def add_batch_of_requests(
already_present_requests: list[ProcessedRequest] = []

for request in requests:
if self._requests_cache.get(request.id):
if self._requests_cache.get(request.unique_key):
# We are not sure if it was already handled at this point, and it is not worth calling API for it.
# It could have been handled by another client in the meantime, so cached information about
# `request.was_already_handled` is not reliable.
already_present_requests.append(
ProcessedRequest.model_validate(
{
'id': request.id,
'uniqueKey': request.unique_key,
'wasAlreadyPresent': True,
'wasAlreadyHandled': request.was_already_handled,
Expand All @@ -267,14 +265,13 @@ async def add_batch_of_requests(
# Add new request to the cache.
processed_request = ProcessedRequest.model_validate(
{
'id': request.id,
'uniqueKey': request.unique_key,
'wasAlreadyPresent': True,
'wasAlreadyHandled': request.was_already_handled,
}
)
self._cache_request(
unique_key_to_request_id(request.unique_key),
request.unique_key,
processed_request,
)
new_requests.append(request)
Expand All @@ -299,7 +296,7 @@ async def add_batch_of_requests(

# Remove unprocessed requests from the cache
for unprocessed_request in api_response.unprocessed_requests:
self._requests_cache.pop(unique_key_to_request_id(unprocessed_request.unique_key), None)
self._requests_cache.pop(unprocessed_request.unique_key, None)

else:
api_response = AddRequestsResponse.model_validate(
Expand All @@ -323,16 +320,16 @@ async def add_batch_of_requests(
return api_response

@override
async def get_request(self, request_id: str) -> Request | None:
async def get_request(self, request_unique_key: str) -> Request | None:
"""Get a request by ID.

Args:
request_id: The ID of the request to get.
request_unique_key: Unique key of the request to get.

Returns:
The request or None if not found.
"""
response = await self._api_client.get_request(request_id)
response = await self._api_client.get_request_by_unique_key(request_unique_key)

if response is None:
return None
Expand Down Expand Up @@ -381,7 +378,7 @@ async def fetch_next_request(self) -> Request | None:
return None

# Use get request to ensure we have the full request object.
request = await self.get_request(request.id)
request = await self.get_request(request.unique_key)
if request is None:
logger.debug(
'Request fetched from the beginning of queue was not found in the RQ',
Expand All @@ -407,7 +404,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
if request.handled_at is None:
request.handled_at = datetime.now(tz=timezone.utc)

if cached_request := self._requests_cache[request.id]:
if cached_request := self._requests_cache[request.unique_key]:
cached_request.was_already_handled = request.was_already_handled
try:
# Update the request in the API
Expand All @@ -419,14 +416,14 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
self._assumed_handled_count += 1

# Update the cache with the handled request
cache_key = unique_key_to_request_id(request.unique_key)
cache_key = request.unique_key
self._cache_request(
cache_key,
processed_request,
hydrated_request=request,
)
except Exception as exc:
logger.debug(f'Error marking request {request.id} as handled: {exc!s}')
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
return None
else:
return processed_request
Expand Down Expand Up @@ -467,7 +464,7 @@ async def reclaim_request(
self._assumed_handled_count -= 1

# Update the cache
cache_key = unique_key_to_request_id(request.unique_key)
cache_key = request.unique_key
self._cache_request(
cache_key,
processed_request,
Expand All @@ -481,11 +478,11 @@ async def reclaim_request(

# Try to release the lock on the request
try:
await self._delete_request_lock(request.id, forefront=forefront)
await self._delete_request_lock(request.unique_key, forefront=forefront)
except Exception as err:
logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err)
logger.debug(f'Failed to delete request lock for request {request.unique_key}', exc_info=err)
except Exception as exc:
logger.debug(f'Error reclaiming request {request.id}: {exc!s}')
logger.debug(f'Error reclaiming request {request.unique_key}: {exc!s}')
return None
else:
return processed_request
Expand Down Expand Up @@ -554,7 +551,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
return None

# Update cache with hydrated request
cache_key = unique_key_to_request_id(request.unique_key)
cache_key = request.unique_key
self._cache_request(
cache_key,
ProcessedRequest(
Expand Down Expand Up @@ -592,7 +589,7 @@ async def _update_request(
)

return ProcessedRequest.model_validate(
{'id': request.id, 'uniqueKey': request.unique_key} | response,
{'uniqueKey': request.unique_key} | response,
)

async def _list_head(
Expand Down Expand Up @@ -653,28 +650,26 @@ async def _list_head(
request = Request.model_validate(request_data)

# Skip requests without ID or unique key
if not request.id or not request.unique_key:
if not request.unique_key:
logger.debug(
'Skipping request from queue head, missing ID or unique key',
extra={
'id': request.id,
'unique_key': request.unique_key,
},
)
continue

# Cache the request
self._cache_request(
unique_key_to_request_id(request.unique_key),
request.unique_key,
ProcessedRequest(
id=request.id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=False,
),
hydrated_request=request,
)
self._queue_head.append(request.id)
self._queue_head.append(request.unique_key)

for leftover_request_id in leftover_buffer:
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
Expand All @@ -683,21 +678,21 @@ async def _list_head(

async def _prolong_request_lock(
self,
request_id: str,
request_unique_key: str,
*,
lock_secs: int,
) -> ProlongRequestLockResponse:
"""Prolong the lock on a specific request in the queue.

Args:
request_id: The identifier of the request whose lock is to be prolonged.
request_unique_key: Unique key of the request whose lock is to be prolonged.
lock_secs: The additional amount of time, in seconds, that the request will remain locked.

Returns:
A response containing the time at which the lock will expire.
"""
response = await self._api_client.prolong_request_lock(
request_id=request_id,
response = await self._api_client.prolong_request_lock_by_unique_key(
request_unique_key=request_unique_key,
# All requests reaching this code were the tip of the queue at the moment when they were fetched,
# so if their lock expires, they should be put back to the forefront as their handling is long overdue.
forefront=True,
Expand All @@ -710,37 +705,37 @@ async def _prolong_request_lock(

# Update the cache with the new lock expiration
for cached_request in self._requests_cache.values():
if cached_request.id == request_id:
if cached_request.unique_key == request_unique_key:
cached_request.lock_expires_at = result.lock_expires_at
break

return result

async def _delete_request_lock(
self,
request_id: str,
request_unique_key: str,
*,
forefront: bool = False,
) -> None:
"""Delete the lock on a specific request in the queue.

Args:
request_id: ID of the request to delete the lock.
request_unique_key: Unique key of the request to delete the lock.
forefront: Whether to put the request in the beginning or the end of the queue after the lock is deleted.
"""
try:
await self._api_client.delete_request_lock(
request_id=request_id,
await self._api_client.delete_request_lock_by_unique_key(
request_unique_key=request_unique_key,
forefront=forefront,
)

# Update the cache to remove the lock
for cached_request in self._requests_cache.values():
if cached_request.id == request_id:
if cached_request.unique_key == request_unique_key:
cached_request.lock_expires_at = None
break
except Exception as err:
logger.debug(f'Failed to delete request lock for request {request_id}', exc_info=err)
logger.debug(f'Failed to delete request lock for request {request_unique_key}', exc_info=err)

def _cache_request(
self,
Expand All @@ -758,7 +753,7 @@ def _cache_request(
hydrated_request: The hydrated request object, if available.
"""
self._requests_cache[cache_key] = CachedRequest(
id=processed_request.id,
unique_key=processed_request.unique_key,
was_already_handled=processed_request.was_already_handled,
hydrated=hydrated_request,
lock_expires_at=None,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/actor_source_base/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# The test fixture will put the Apify SDK wheel path on the next line
APIFY_SDK_WHEEL_PLACEHOLDER
uvicorn[standard]
crawlee[parsel] @ git+https://github.com/apify/crawlee-python.git@master
crawlee[parsel] @ git+https://github.com/apify/crawlee-python.git@remove-request-id
29 changes: 13 additions & 16 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,38 +399,35 @@ async def main() -> None:
assert run_result.status == 'SUCCEEDED'


async def test_get_request_by_id(
async def test_get_request_by_unique_key(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test retrieving specific requests by their ID."""
"""Test retrieving specific requests by their unique_key."""

async def main() -> None:
async with Actor:
rq = await Actor.open_request_queue()
Actor.log.info('Request queue opened')

# Add a request and get its ID
# Add a request and get its unique_key
add_result = await rq.add_request('https://example.com/test')
request_id = add_result.id
Actor.log.info(f'Request added with ID: {request_id}')
request_unique_key = add_result.unique_key
Actor.log.info(f'Request added with unique_key: {request_unique_key}')

# Retrieve the request by ID
retrieved_request = await rq.get_request(request_id)
# Retrieve the request by unique_key
retrieved_request = await rq.get_request(request_unique_key)
assert retrieved_request is not None, f'retrieved_request={retrieved_request}'
assert retrieved_request.url == 'https://example.com/test', f'retrieved_request.url={retrieved_request.url}'
assert retrieved_request.id == request_id, (
f'retrieved_request.id={retrieved_request.id}',
f'request_id={request_id}',
)
Actor.log.info('Request retrieved successfully by ID')
assert retrieved_request.unique_key == request_unique_key, (f'{request_unique_key=}',)
Actor.log.info('Request retrieved successfully by unique_key')

# Test with non-existent ID
non_existent_request = await rq.get_request('non-existent-id')
# Test with non-existent unique_key
non_existent_request = await rq.get_request('non-existent-unique_key')
assert non_existent_request is None, f'non_existent_request={non_existent_request}'
Actor.log.info('Non-existent ID correctly returned None')
Actor.log.info('Non-existent unique_key correctly returned None')

actor = await make_actor(label='rq-get-by-id-test', main_func=main)
actor = await make_actor(label='rq-get-by-unique-key-test', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'

Expand Down
2 changes: 0 additions & 2 deletions tests/unit/scrapy/requests/test_to_apify_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def test_with_id_and_unique_key(spider: Spider) -> None:
url='https://example.com',
method='GET',
meta={
'apify_request_id': 'abc123',
'apify_request_unique_key': 'https://example.com',
'userData': {'some_user_data': 'hello'},
},
Expand All @@ -77,7 +76,6 @@ def test_with_id_and_unique_key(spider: Spider) -> None:

assert apify_request.url == 'https://example.com'
assert apify_request.method == 'GET'
assert apify_request.id == 'abc123'
assert apify_request.unique_key == 'https://example.com'

user_data = apify_request.user_data
Expand Down
Loading
Loading