Skip to content

Commit b17ebef

Browse files
committed
Track pending_request_count in local metadata estimation
1 parent 6edb093 commit b17ebef

File tree

2 files changed

+6
-0
lines changed

2 files changed

+6
-0
lines changed

src/apify/storage_clients/_apify/_request_queue_shared_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ async def add_batch_of_requests(
170170
new_request_count += 1
171171

172172
self.metadata.total_request_count += new_request_count
173+
self.metadata.pending_request_count += new_request_count
173174

174175
return api_response
175176

@@ -265,6 +266,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
265266
# Update assumed handled count if this wasn't already handled
266267
if not processed_request.was_already_handled:
267268
self.metadata.handled_request_count += 1
269+
self.metadata.pending_request_count -= 1
268270

269271
# Update the cache with the handled request
270272
cache_key = request.unique_key
@@ -312,6 +314,7 @@ async def reclaim_request(
312314
# we're putting it back for processing.
313315
if request.was_already_handled and not processed_request.was_already_handled:
314316
self.metadata.handled_request_count -= 1
317+
self.metadata.pending_request_count += 1
315318

316319
# Update the cache
317320
cache_key = request.unique_key

src/apify/storage_clients/_apify/_request_queue_single_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ async def add_batch_of_requests(
168168
if not processed_request.was_already_present and not processed_request.was_already_handled:
169169
new_request_count += 1
170170
self.metadata.total_request_count += new_request_count
171+
self.metadata.pending_request_count += new_request_count
171172

172173
return api_response
173174

@@ -271,6 +272,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
271272
if request.handled_at is None:
272273
request.handled_at = datetime.now(tz=timezone.utc)
273274
self.metadata.handled_request_count += 1
275+
self.metadata.pending_request_count -= 1
274276

275277
if cached_request := self._requests_cache.get(request.unique_key):
276278
cached_request.handled_at = request.handled_at
@@ -333,6 +335,7 @@ async def reclaim_request(
333335
# we're putting it back for processing.
334336
if request.was_already_handled and not processed_request.was_already_handled:
335337
self.metadata.handled_request_count -= 1
338+
self.metadata.pending_request_count += 1
336339

337340
except Exception as exc:
338341
logger.debug(f'Error reclaiming request {request.unique_key}: {exc!s}')

0 commit comments

Comments
 (0)