Skip to content
32 changes: 29 additions & 3 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from datetime import datetime, timedelta, timezone
from hashlib import sha256
from logging import getLogger
from typing import TYPE_CHECKING, Final
from typing import TYPE_CHECKING, Annotated, Final

from cachetools import LRUCache
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import override

from apify_client import ApifyClientAsync
Expand Down Expand Up @@ -53,6 +54,30 @@ def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) ->
return url_safe_key[:request_id_length]


class RequestQueueStats(BaseModel):
model_config = ConfigDict(populate_by_name=True)

delete_count: Annotated[int, Field(alias='deleteCount', default=0)]
""""The number of request queue deletes."""

head_item_read_count: Annotated[int, Field(alias='headItemReadCount', default=0)]
"""The number of request queue head reads."""

read_count: Annotated[int, Field(alias='readCount', default=0)]
"""The number of request queue reads."""

storage_bytes: Annotated[int, Field(alias='storageBytes', default=0)]
"""Storage size in Bytes."""

write_count: Annotated[int, Field(alias='writeCount', default=0)]
"""The number of request queue writes."""


class ApifyRequestQueueMetadata(RequestQueueMetadata):
stats: Annotated[RequestQueueStats, Field(alias='stats', default_factory=RequestQueueStats)]
"""Additional statistics about the request queue."""


class ApifyRequestQueueClient(RequestQueueClient):
"""An Apify platform implementation of the request queue client."""

Expand Down Expand Up @@ -106,7 +131,7 @@ async def _get_metadata_estimate(self) -> RequestQueueMetadata:
return self._metadata

@override
async def get_metadata(self) -> RequestQueueMetadata:
async def get_metadata(self) -> ApifyRequestQueueMetadata:
"""Get metadata about the request queue.

Returns:
Expand All @@ -117,7 +142,7 @@ async def get_metadata(self) -> RequestQueueMetadata:
if response is None:
raise ValueError('Failed to fetch request queue metadata from the API.')
# Enhance API response by local estimations (API can be delayed few seconds, while local estimation not.)
return RequestQueueMetadata(
return ApifyRequestQueueMetadata(
id=response['id'],
name=response['name'],
total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count),
Expand All @@ -127,6 +152,7 @@ async def get_metadata(self) -> RequestQueueMetadata:
modified_at=max(response['modifiedAt'], self._metadata.modified_at),
accessed_at=max(response['accessedAt'], self._metadata.accessed_at),
had_multiple_clients=response['hadMultipleClients'] or self._metadata.had_multiple_clients,
stats=RequestQueueStats.model_validate(response['stats'], by_alias=True),
)

@classmethod
Expand Down
26 changes: 25 additions & 1 deletion tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import pytest

Expand All @@ -14,6 +14,7 @@
from crawlee.storages import RequestQueue

from .conftest import MakeActorFunction, RunActorFunction
from apify.storage_clients._apify._request_queue_client import ApifyRequestQueueMetadata


async def test_add_and_fetch_requests(
Expand Down Expand Up @@ -1278,3 +1279,26 @@ async def test_request_queue_not_had_multiple_clients(
api_response = await api_client.get()
assert api_response
assert api_response['hadMultipleClients'] is False


async def test_request_queue_has_stats(request_queue_force_cloud: RequestQueue) -> None:
"""Test that Apify based request queue has stats in metadata."""

add_request_count = 3
read_request_count = 2

await request_queue_force_cloud.add_requests(
[Request.from_url(f'http://example.com/{i}') for i in range(add_request_count)]
)
for _ in range(read_request_count):
await request_queue_force_cloud.get_request(Request.from_url('http://example.com/1').unique_key)

# Wait for stats to become stable
await asyncio.sleep(10)

metadata = await request_queue_force_cloud.get_metadata()

assert hasattr(metadata, 'stats')
apify_metadata = cast('ApifyRequestQueueMetadata', metadata)
assert apify_metadata.stats.read_count == read_request_count
assert apify_metadata.stats.write_count == add_request_count
Loading