Skip to content

Commit 71ac38d

Browse files
committed
Update request queue to use manual request tracking
1 parent f82d110 commit 71ac38d

File tree

3 files changed

+84
-5
lines changed

3 files changed

+84
-5
lines changed

src/apify/storage_clients/_apify/_dataset_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ async def open(
131131
apify_dataset_client = apify_client_async.dataset(dataset_id=id)
132132

133133
# If both id and name are None, try to get the default storage ID from environment variables.
134+
# The default storage ID environment variable is set by the Apify platform. It also contains
135+
# a new storage ID after Actor's reboot or migration.
134136
if id is None and name is None:
135137
id = configuration.default_dataset_id
136138
apify_dataset_client = apify_client_async.dataset(dataset_id=id)

src/apify/storage_clients/_apify/_key_value_store_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ async def open(
123123
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)
124124

125125
# If both id and name are None, try to get the default storage ID from environment variables.
126+
# The default storage ID environment variable is set by the Apify platform. It also contains
127+
# a new storage ID after Actor's reboot or migration.
126128
if id is None and name is None:
127129
id = configuration.default_key_value_store_id
128130
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ def __init__(
3939
self,
4040
*,
4141
api_client: RequestQueueClientAsync,
42+
id: str,
43+
name: str | None,
44+
total_request_count: int,
45+
handled_request_count: int,
4246
) -> None:
4347
"""Initialize a new instance.
4448
@@ -47,6 +51,12 @@ def __init__(
4751
self._api_client = api_client
4852
"""The Apify request queue client for API operations."""
4953

54+
self._id = id
55+
"""The ID of the request queue."""
56+
57+
self._name = name
58+
"""The name of the request queue."""
59+
5060
self._queue_head = deque[str]()
5161
"""A deque to store request IDs in the queue head."""
5262

@@ -59,10 +69,38 @@ def __init__(
5969
self._should_check_for_forefront_requests = False
6070
"""Whether to check for forefront requests in the next list_head call."""
6171

72+
self._had_multiple_clients = False
73+
"""Whether the request queue has been accessed by multiple clients."""
74+
75+
self._initial_total_count = total_request_count
76+
"""The initial total request count (from the API) when the queue was opened."""
77+
78+
self._initial_handled_count = handled_request_count
79+
"""The initial handled request count (from the API) when the queue was opened."""
80+
81+
self._assumed_total_count = 0
82+
"""The number of requests we assume are in the queue (tracked manually for this instance)."""
83+
84+
self._assumed_handled_count = 0
85+
"""The number of requests we assume have been handled (tracked manually for this instance)."""
86+
6287
@override
6388
async def get_metadata(self) -> RequestQueueMetadata:
64-
metadata = await self._api_client.get()
65-
return RequestQueueMetadata.model_validate(metadata)
89+
total_count = self._initial_total_count + self._assumed_total_count
90+
handled_count = self._initial_handled_count + self._assumed_handled_count
91+
pending_count = total_count - handled_count
92+
93+
return RequestQueueMetadata(
94+
id=self._id,
95+
name=self._name,
96+
total_request_count=total_count,
97+
handled_request_count=handled_count,
98+
pending_request_count=pending_count,
99+
created_at=datetime.now(timezone.utc),
100+
modified_at=datetime.now(timezone.utc),
101+
accessed_at=datetime.now(timezone.utc),
102+
had_multiple_clients=self._had_multiple_clients,
103+
)
66104

67105
@classmethod
68106
async def open(
@@ -136,6 +174,8 @@ async def open(
136174
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
137175

138176
# If both id and name are None, try to get the default storage ID from environment variables.
177+
# The default storage ID environment variable is set by the Apify platform. It also contains
178+
# a new storage ID after Actor's reboot or migration.
139179
if id is None and name is None:
140180
id = configuration.default_request_queue_id
141181
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
@@ -155,8 +195,20 @@ async def open(
155195
if metadata is None:
156196
raise ValueError(f'Opening request queue with id={id} and name={name} failed.')
157197

198+
metadata_model = RequestQueueMetadata.model_validate(
199+
await apify_rqs_client.get_or_create(),
200+
)
201+
202+
# Ensure we have a valid ID.
203+
if id is None:
204+
raise ValueError('Request queue ID cannot be None.')
205+
158206
return cls(
159207
api_client=apify_rq_client,
208+
id=id,
209+
name=name,
210+
total_request_count=metadata_model.total_request_count,
211+
handled_request_count=metadata_model.handled_request_count,
160212
)
161213

162214
@override
@@ -195,10 +247,19 @@ async def add_batch_of_requests(
195247
for request in requests
196248
]
197249

198-
# Send requests to API
250+
# Send requests to API.
199251
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
200252

201-
return AddRequestsResponse.model_validate(response)
253+
# Update assumed total count for newly added requests.
254+
api_response = AddRequestsResponse.model_validate(response)
255+
new_request_count = 0
256+
for processed_request in api_response.processed_requests:
257+
if not processed_request.was_already_present and not processed_request.was_already_handled:
258+
new_request_count += 1
259+
260+
self._assumed_total_count += new_request_count
261+
262+
return api_response
202263

203264
@override
204265
async def get_request(self, request_id: str) -> Request | None:
@@ -288,6 +349,10 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
288349
processed_request = await self._update_request(request)
289350
processed_request.unique_key = request.unique_key
290351

352+
# Update assumed handled count if this wasn't already handled
353+
if not processed_request.was_already_handled:
354+
self._assumed_handled_count += 1
355+
291356
# Update the cache with the handled request
292357
cache_key = unique_key_to_request_id(request.unique_key)
293358
self._cache_request(
@@ -320,11 +385,21 @@ async def reclaim_request(
320385
Returns:
321386
Information about the queue operation. `None` if the given request was not in progress.
322387
"""
388+
# Check if the request was marked as handled and clear it. When reclaiming,
389+
# we want to put the request back for processing.
390+
if request.was_already_handled:
391+
request.handled_at = None
392+
323393
try:
324-
# Update the request in the API
394+
# Update the request in the API.
325395
processed_request = await self._update_request(request, forefront=forefront)
326396
processed_request.unique_key = request.unique_key
327397

398+
# If the request was previously handled, decrement our handled count since
399+
# we're putting it back for processing.
400+
if request.was_already_handled and not processed_request.was_already_handled:
401+
self._assumed_handled_count -= 1
402+
328403
# Update the cache
329404
cache_key = unique_key_to_request_id(request.unique_key)
330405
self._cache_request(

0 commit comments

Comments
 (0)