Skip to content

Commit 76886ce

Browse files
authored
fix: Cache requests in RQ implementations by id (#633)
### Description - Cache requests in RQ implementations by `id` instead of `unique_key` due to the Apify platform truncating long unique_keys. ### Issues - Closes: #630 ### Testing - Added unit test
1 parent 7e927a0 commit 76886ce

File tree

4 files changed

+153
-115
lines changed

4 files changed

+153
-115
lines changed

src/apify/storage_clients/_apify/_models.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ class CachedRequest(BaseModel):
9494
Only internal structure.
9595
"""
9696

97-
unique_key: str
98-
"""Unique key of the request."""
97+
id: str
98+
"""Id of the request."""
9999

100100
was_already_handled: bool
101101
"""Whether the request was already handled."""

src/apify/storage_clients/_apify/_request_queue_shared_client.py

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ def __init__(
5454
"""The Apify request queue client for API operations."""
5555

5656
self._queue_head = deque[str]()
57-
"""A deque to store request unique keys in the queue head."""
57+
"""A deque to store request ids in the queue head."""
5858

5959
self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=cache_size)
60-
"""A cache to store request objects. Request unique key is used as the cache key."""
60+
"""A cache to store request objects. Request id is used as the cache key."""
6161

6262
self._queue_has_locked_requests: bool | None = None
6363
"""Whether the queue has requests locked by another client."""
@@ -101,12 +101,14 @@ async def add_batch_of_requests(
101101
already_present_requests: list[ProcessedRequest] = []
102102

103103
for request in requests:
104-
if self._requests_cache.get(request.unique_key):
104+
request_id = unique_key_to_request_id(request.unique_key)
105+
if self._requests_cache.get(request_id):
105106
# We are not sure if it was already handled at this point, and it is not worth calling API for it.
106107
# It could have been handled by another client in the meantime, so cached information about
107108
# `request.was_already_handled` is not reliable.
108109
already_present_requests.append(
109110
ProcessedRequest(
111+
id=request_id,
110112
unique_key=request.unique_key,
111113
was_already_present=True,
112114
was_already_handled=request.was_already_handled,
@@ -116,12 +118,13 @@ async def add_batch_of_requests(
116118
else:
117119
# Add new request to the cache.
118120
processed_request = ProcessedRequest(
121+
id=request_id,
119122
unique_key=request.unique_key,
120123
was_already_present=True,
121124
was_already_handled=request.was_already_handled,
122125
)
123126
self._cache_request(
124-
request.unique_key,
127+
request_id,
125128
processed_request,
126129
)
127130
new_requests.append(request)
@@ -131,7 +134,6 @@ async def add_batch_of_requests(
131134
requests_dict = [
132135
request.model_dump(
133136
by_alias=True,
134-
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them.
135137
)
136138
for request in new_requests
137139
]
@@ -146,7 +148,8 @@ async def add_batch_of_requests(
146148

147149
# Remove unprocessed requests from the cache
148150
for unprocessed_request in api_response.unprocessed_requests:
149-
self._requests_cache.pop(unprocessed_request.unique_key, None)
151+
unprocessed_request_id = unique_key_to_request_id(unprocessed_request.unique_key)
152+
self._requests_cache.pop(unprocessed_request_id, None)
150153

151154
else:
152155
api_response = AddRequestsResponse.model_validate(
@@ -179,7 +182,10 @@ async def get_request(self, unique_key: str) -> Request | None:
179182
Returns:
180183
The request or None if not found.
181184
"""
182-
response = await self._api_client.get_request(unique_key_to_request_id(unique_key))
185+
return await self._get_request_by_id(unique_key_to_request_id(unique_key))
186+
187+
async def _get_request_by_id(self, request_id: str) -> Request | None:
188+
response = await self._api_client.get_request(request_id)
183189

184190
if response is None:
185191
return None
@@ -206,32 +212,32 @@ async def fetch_next_request(self) -> Request | None:
206212
return None
207213

208214
# Get the next request ID from the queue head
209-
next_unique_key = self._queue_head.popleft()
215+
next_request_id = self._queue_head.popleft()
210216

211-
request = await self._get_or_hydrate_request(next_unique_key)
217+
request = await self._get_or_hydrate_request(next_request_id)
212218

213219
# Handle potential inconsistency where request might not be in the main table yet
214220
if request is None:
215221
logger.debug(
216222
'Cannot find a request from the beginning of queue, will be retried later',
217-
extra={'nextRequestUniqueKey': next_unique_key},
223+
extra={'next_request_id': next_request_id},
218224
)
219225
return None
220226

221227
# If the request was already handled, skip it
222228
if request.handled_at is not None:
223229
logger.debug(
224230
'Request fetched from the beginning of queue was already handled',
225-
extra={'nextRequestUniqueKey': next_unique_key},
231+
extra={'next_request_id': next_request_id},
226232
)
227233
return None
228234

229235
# Use get request to ensure we have the full request object.
230-
request = await self.get_request(request.unique_key)
236+
request = await self._get_request_by_id(next_request_id)
231237
if request is None:
232238
logger.debug(
233239
'Request fetched from the beginning of queue was not found in the RQ',
234-
extra={'nextRequestUniqueKey': next_unique_key},
240+
extra={'next_request_id': next_request_id},
235241
)
236242
return None
237243

@@ -248,15 +254,17 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
248254
Returns:
249255
Information about the queue operation. `None` if the given request was not in progress.
250256
"""
257+
request_id = unique_key_to_request_id(request.unique_key)
251258
# Set the handled_at timestamp if not already set
252259
if request.handled_at is None:
253260
request.handled_at = datetime.now(tz=timezone.utc)
254261

255-
if cached_request := self._requests_cache[request.unique_key]:
262+
if cached_request := self._requests_cache[request_id]:
256263
cached_request.was_already_handled = request.was_already_handled
257264
try:
258265
# Update the request in the API
259266
processed_request = await self._update_request(request)
267+
processed_request.id = request_id
260268
processed_request.unique_key = request.unique_key
261269

262270
# Update assumed handled count if this wasn't already handled
@@ -265,10 +273,9 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
265273
self.metadata.pending_request_count -= 1
266274

267275
# Update the cache with the handled request
268-
cache_key = request.unique_key
269276
self._cache_request(
270-
cache_key,
271-
processed_request,
277+
cache_key=request_id,
278+
processed_request=processed_request,
272279
hydrated_request=request,
273280
)
274281
except Exception as exc:
@@ -352,17 +359,17 @@ async def _ensure_head_is_non_empty(self) -> None:
352359
# Fetch requests from the API and populate the queue head
353360
await self._list_head()
354361

355-
async def _get_or_hydrate_request(self, unique_key: str) -> Request | None:
356-
"""Get a request by unique key, either from cache or by fetching from API.
362+
async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
363+
"""Get a request by id, either from cache or by fetching from API.
357364
358365
Args:
359-
unique_key: Unique key of the request to get.
366+
request_id: Id of the request to get.
360367
361368
Returns:
362369
The request if found and valid, otherwise None.
363370
"""
364371
# First check if the request is in our cache
365-
cached_entry = self._requests_cache.get(unique_key)
372+
cached_entry = self._requests_cache.get(request_id)
366373

367374
if cached_entry and cached_entry.hydrated:
368375
# If we have the request hydrated in cache, return it
@@ -371,25 +378,25 @@ async def _get_or_hydrate_request(self, unique_key: str) -> Request | None:
371378
# If not in cache or not hydrated, fetch the request
372379
try:
373380
# Fetch the request data
374-
request = await self.get_request(unique_key)
381+
request = await self._get_request_by_id(request_id)
375382

376383
# If request is not found and return None
377384
if not request:
378385
return None
379386

380387
# Update cache with hydrated request
381-
cache_key = request.unique_key
382388
self._cache_request(
383-
cache_key,
384-
ProcessedRequest(
389+
cache_key=request_id,
390+
processed_request=ProcessedRequest(
391+
id=request_id,
385392
unique_key=request.unique_key,
386393
was_already_present=True,
387394
was_already_handled=request.handled_at is not None,
388395
),
389396
hydrated_request=request,
390397
)
391398
except Exception as exc:
392-
logger.debug(f'Error fetching request {unique_key}: {exc!s}')
399+
logger.debug(f'Error fetching request {request_id}: {exc!s}')
393400
return None
394401
else:
395402
return request
@@ -438,8 +445,8 @@ async def _list_head(
438445
logger.debug(f'Using cached queue head with {len(self._queue_head)} requests')
439446
# Create a list of requests from the cached queue head
440447
items = []
441-
for unique_key in list(self._queue_head)[:limit]:
442-
cached_request = self._requests_cache.get(unique_key)
448+
for request_id in list(self._queue_head)[:limit]:
449+
cached_request = self._requests_cache.get(request_id)
443450
if cached_request and cached_request.hydrated:
444451
items.append(cached_request.hydrated)
445452

@@ -472,32 +479,35 @@ async def _list_head(
472479

473480
for request_data in response.get('items', []):
474481
request = Request.model_validate(request_data)
482+
request_id = request_data.get('id')
475483

476484
# Skip requests without ID or unique key
477-
if not request.unique_key:
485+
if not request.unique_key or not request_id:
478486
logger.debug(
479-
'Skipping request from queue head, missing unique key',
487+
'Skipping request from queue head, missing unique key or id',
480488
extra={
481489
'unique_key': request.unique_key,
490+
'id': request_id,
482491
},
483492
)
484493
continue
485494

486495
# Cache the request
487496
self._cache_request(
488-
request.unique_key,
497+
request_id,
489498
ProcessedRequest(
499+
id=request_id,
490500
unique_key=request.unique_key,
491501
was_already_present=True,
492502
was_already_handled=False,
493503
),
494504
hydrated_request=request,
495505
)
496-
self._queue_head.append(request.unique_key)
506+
self._queue_head.append(request_id)
497507

498-
for leftover_unique_key in leftover_buffer:
508+
for leftover_id in leftover_buffer:
499509
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
500-
self._queue_head.append(leftover_unique_key)
510+
self._queue_head.append(leftover_id)
501511
return RequestQueueHead.model_validate(response)
502512

503513
def _cache_request(
@@ -516,7 +526,7 @@ def _cache_request(
516526
hydrated_request: The hydrated request object, if available.
517527
"""
518528
self._requests_cache[cache_key] = CachedRequest(
519-
unique_key=processed_request.unique_key,
529+
id=processed_request.id,
520530
was_already_handled=processed_request.was_already_handled,
521531
hydrated=hydrated_request,
522532
lock_expires_at=None,

0 commit comments

Comments
 (0)