Skip to content

Commit c5968bc

Browse files
committed
Use composition instead of inheritance
1 parent b4a588d commit c5968bc

File tree

6 files changed

+191
-87
lines changed

6 files changed

+191
-87
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 128 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,32 @@
11
from __future__ import annotations
22

3-
import re
4-
from base64 import b64encode
5-
from hashlib import sha256
63
from logging import getLogger
7-
from typing import TYPE_CHECKING, Final
4+
from typing import TYPE_CHECKING, Final, Literal
85

96
from typing_extensions import override
107

118
from apify_client import ApifyClientAsync
129
from crawlee._utils.crypto import crypto_random_object_id
1310
from crawlee.storage_clients._base import RequestQueueClient
14-
from crawlee.storage_clients.models import RequestQueueMetadata
11+
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
1512
from crawlee.storages import RequestQueue
1613

1714
from ._models import ApifyRequestQueueMetadata, RequestQueueStats
15+
from ._request_queue_shared_client import _ApifyRequestQueueSharedClient
16+
from ._request_queue_single_client import _ApifyRequestQueueSingleClient
1817
from ._utils import AliasResolver
1918

2019
if TYPE_CHECKING:
20+
from collections.abc import Sequence
21+
2122
from apify_client.clients import RequestQueueClientAsync
23+
from crawlee import Request
2224

2325
from apify import Configuration
2426

2527
logger = getLogger(__name__)
2628

2729

28-
def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> str:
29-
"""Generate a deterministic request ID based on a unique key.
30-
31-
Args:
32-
unique_key: The unique key to convert into a request ID.
33-
request_id_length: The length of the request ID.
34-
35-
Returns:
36-
A URL-safe, truncated request ID based on the unique key.
37-
"""
38-
# Encode the unique key and compute its SHA-256 hash
39-
hashed_key = sha256(unique_key.encode('utf-8')).digest()
40-
41-
# Encode the hash in base64 and decode it to get a string
42-
base64_encoded = b64encode(hashed_key).decode('utf-8')
43-
44-
# Remove characters that are not URL-safe ('+', '/', or '=')
45-
url_safe_key = re.sub(r'(\+|\/|=)', '', base64_encoded)
46-
47-
# Truncate the key to the desired length
48-
return url_safe_key[:request_id_length]
49-
50-
5130
class ApifyRequestQueueClient(RequestQueueClient):
5231
"""Base class for Apify platform implementations of the request queue client."""
5332

@@ -59,6 +38,7 @@ def __init__(
5938
*,
6039
api_client: RequestQueueClientAsync,
6140
metadata: RequestQueueMetadata,
41+
access: Literal['single', 'shared'] = 'single',
6242
) -> None:
6343
"""Initialize a new instance.
6444
@@ -67,8 +47,112 @@ def __init__(
6747
self._api_client = api_client
6848
"""The Apify request queue client for API operations."""
6949

70-
self._metadata = metadata
71-
"""Additional data related to the RequestQueue."""
50+
self._implementation: _ApifyRequestQueueSingleClient | _ApifyRequestQueueSharedClient
51+
"""Internal implementation used to communicate with the Apify platform based Request Queue."""
52+
if access == 'single':
53+
self._implementation = _ApifyRequestQueueSingleClient(
54+
api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS
55+
)
56+
elif access == 'shared':
57+
self._implementation = _ApifyRequestQueueSharedClient(
58+
api_client=self._api_client,
59+
metadata=metadata,
60+
cache_size=self._MAX_CACHED_REQUESTS,
61+
metadata_getter=self.get_metadata,
62+
)
63+
else:
64+
raise RuntimeError(f"Unsupported access type: {access}. Allowed values are 'single' or 'shared'.")
65+
66+
@property
67+
def _metadata(self) -> RequestQueueMetadata:
68+
return self._implementation.metadata
69+
70+
@override
71+
async def add_batch_of_requests(
72+
self,
73+
requests: Sequence[Request],
74+
*,
75+
forefront: bool = False,
76+
) -> AddRequestsResponse:
77+
"""Add a batch of requests to the queue.
78+
79+
Args:
80+
requests: The requests to add.
81+
forefront: Whether to add the requests to the beginning of the queue.
82+
83+
Returns:
84+
Response containing information about the added requests.
85+
"""
86+
return await self._implementation.add_batch_of_requests(requests, forefront=forefront)
87+
88+
@override
89+
async def fetch_next_request(self) -> Request | None:
90+
"""Return the next request in the queue to be processed.
91+
92+
Once you successfully finish processing of the request, you need to call `mark_request_as_handled`
93+
to mark the request as handled in the queue. If there was some error in processing the request, call
94+
`reclaim_request` instead, so that the queue will give the request to some other consumer
95+
in another call to the `fetch_next_request` method.
96+
97+
Returns:
98+
The request or `None` if there are no more pending requests.
99+
"""
100+
return await self._implementation.fetch_next_request()
101+
102+
@override
103+
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
104+
"""Mark a request as handled after successful processing.
105+
106+
Handled requests will never again be returned by the `fetch_next_request` method.
107+
108+
Args:
109+
request: The request to mark as handled.
110+
111+
Returns:
112+
Information about the queue operation. `None` if the given request was not in progress.
113+
"""
114+
return await self._implementation.mark_request_as_handled(request)
115+
116+
@override
117+
async def get_request(self, unique_key: str) -> Request | None:
118+
"""Get a request by unique key.
119+
120+
Args:
121+
unique_key: Unique key of the request to get.
122+
123+
Returns:
124+
The request or None if not found.
125+
"""
126+
return await self._implementation.get_request(unique_key)
127+
128+
@override
129+
async def reclaim_request(
130+
self,
131+
request: Request,
132+
*,
133+
forefront: bool = False,
134+
) -> ProcessedRequest | None:
135+
"""Reclaim a failed request back to the queue.
136+
137+
The request will be returned for processing later again by another call to `fetch_next_request`.
138+
139+
Args:
140+
request: The request to return to the queue.
141+
forefront: Whether to add the request to the head or the end of the queue.
142+
143+
Returns:
144+
Information about the queue operation. `None` if the given request was not in progress.
145+
"""
146+
return await self._implementation.reclaim_request(request, forefront=forefront)
147+
148+
@override
149+
async def is_empty(self) -> bool:
150+
"""Check if the queue is empty.
151+
152+
Returns:
153+
True if the queue is empty, False otherwise.
154+
"""
155+
return await self._implementation.is_empty()
72156

73157
@override
74158
async def get_metadata(self) -> ApifyRequestQueueMetadata:
@@ -103,6 +187,7 @@ async def open(
103187
name: str | None,
104188
alias: str | None,
105189
configuration: Configuration,
190+
access: Literal['single', 'shared'] = 'single',
106191
) -> ApifyRequestQueueClient:
107192
"""Open an Apify request queue client.
108193
@@ -120,6 +205,18 @@ async def open(
120205
configuration: The configuration object containing API credentials and settings. Must include a valid
121206
`token` and `api_base_url`. May also contain a `default_request_queue_id` for fallback when neither
122207
`id`, `name`, nor `alias` is provided.
208+
access: Controls the implementation of the request queue client based on expected scenario:
209+
- 'single' is suitable for single consumer scenarios. It makes less API calls, is cheaper and faster.
210+
- 'shared' is suitable for multiple consumers scenarios at the cost of higher API usage.
211+
212+
Detailed constraints for the 'single' access type:
213+
- Only one client is consuming the request queue at the time.
214+
- Multiple producers can put requests to the queue, but their forefront requests are not guaranteed to
215+
be handled so quickly as this client does not aggressively fetch the forefront and relies on local
216+
head estimation.
217+
- Requests are only added to the queue, never deleted by other clients. (Marking as handled is ok.)
218+
- Other producers can add new requests, but not modify existing ones.
219+
(Modifications would not be included in local cache)
123220
124221
Returns:
125222
An instance for the opened or created storage client.
@@ -217,10 +314,7 @@ async def open(
217314

218315
metadata_model = RequestQueueMetadata.model_validate(metadata)
219316

220-
return cls(
221-
api_client=apify_rq_client,
222-
metadata=metadata_model,
223-
)
317+
return cls(api_client=apify_rq_client, metadata=metadata_model, access=access)
224318

225319
@override
226320
async def purge(self) -> None:

src/apify/storage_clients/_apify/_request_queue_shared_client.py

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,26 @@
44
from collections import deque
55
from datetime import datetime, timedelta, timezone
66
from logging import getLogger
7-
from typing import TYPE_CHECKING, Final
7+
from typing import TYPE_CHECKING, Any, Final
88

99
from cachetools import LRUCache
10-
from typing_extensions import override
1110

1211
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
1312

14-
from . import ApifyRequestQueueClient
15-
from ._models import CachedRequest, RequestQueueHead
16-
from ._request_queue_client import unique_key_to_request_id
13+
from ._models import ApifyRequestQueueMetadata, CachedRequest, RequestQueueHead
14+
from ._utils import unique_key_to_request_id
1715
from apify import Request
1816

1917
if TYPE_CHECKING:
20-
from collections.abc import Sequence
18+
from collections.abc import Callable, Coroutine, Sequence
2119

2220
from apify_client.clients import RequestQueueClientAsync
2321

2422

2523
logger = getLogger(__name__)
2624

2725

28-
class ApifyRequestQueueSharedClient(ApifyRequestQueueClient):
26+
class _ApifyRequestQueueSharedClient:
2927
"""An Apify platform implementation of the request queue client.
3028
3129
This implementation supports multiple producers and multiple consumers scenario.
@@ -39,21 +37,26 @@ def __init__(
3937
*,
4038
api_client: RequestQueueClientAsync,
4139
metadata: RequestQueueMetadata,
40+
cache_size: int,
41+
metadata_getter: Callable[[], Coroutine[Any, Any, ApifyRequestQueueMetadata]],
4242
) -> None:
4343
"""Initialize a new instance.
4444
4545
Preferably use the `ApifyRequestQueueClient.open` class method to create a new instance.
4646
"""
47+
self.metadata = metadata
48+
"""Additional data related to the RequestQueue."""
49+
50+
self._metadata_getter = metadata_getter
51+
"""Async function to get metadata from API."""
52+
4753
self._api_client = api_client
4854
"""The Apify request queue client for API operations."""
4955

50-
self._metadata = metadata
51-
"""Additional data related to the RequestQueue."""
52-
5356
self._queue_head = deque[str]()
5457
"""A deque to store request unique keys in the queue head."""
5558

56-
self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=self._MAX_CACHED_REQUESTS)
59+
self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=cache_size)
5760
"""A cache to store request objects. Request unique key is used as the cache key."""
5861

5962
self._queue_has_locked_requests: bool | None = None
@@ -72,12 +75,11 @@ async def _get_metadata_estimate(self) -> RequestQueueMetadata:
7275
Local estimation of metadata is without delay, unlike metadata from API. In situation where there is only one
7376
client, it is the better choice.
7477
"""
75-
if self._metadata.had_multiple_clients:
76-
return await self.get_metadata()
78+
if self.metadata.had_multiple_clients:
79+
return await self._metadata_getter()
7780
# Get local estimation (will not include changes done bo another client)
78-
return self._metadata
81+
return self.metadata
7982

80-
@override
8183
async def add_batch_of_requests(
8284
self,
8385
requests: Sequence[Request],
@@ -167,11 +169,10 @@ async def add_batch_of_requests(
167169
if not processed_request.was_already_present and not processed_request.was_already_handled:
168170
new_request_count += 1
169171

170-
self._metadata.total_request_count += new_request_count
172+
self.metadata.total_request_count += new_request_count
171173

172174
return api_response
173175

174-
@override
175176
async def get_request(self, unique_key: str) -> Request | None:
176177
"""Get a request by unique key.
177178
@@ -188,7 +189,6 @@ async def get_request(self, unique_key: str) -> Request | None:
188189

189190
return Request.model_validate(response)
190191

191-
@override
192192
async def fetch_next_request(self) -> Request | None:
193193
"""Return the next request in the queue to be processed.
194194
@@ -240,7 +240,6 @@ async def fetch_next_request(self) -> Request | None:
240240

241241
return request
242242

243-
@override
244243
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
245244
"""Mark a request as handled after successful processing.
246245
@@ -265,7 +264,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
265264

266265
# Update assumed handled count if this wasn't already handled
267266
if not processed_request.was_already_handled:
268-
self._metadata.handled_request_count += 1
267+
self.metadata.handled_request_count += 1
269268

270269
# Update the cache with the handled request
271270
cache_key = request.unique_key
@@ -280,7 +279,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
280279
else:
281280
return processed_request
282281

283-
@override
284282
async def reclaim_request(
285283
self,
286284
request: Request,
@@ -313,7 +311,7 @@ async def reclaim_request(
313311
# If the request was previously handled, decrement our handled count since
314312
# we're putting it back for processing.
315313
if request.was_already_handled and not processed_request.was_already_handled:
316-
self._metadata.handled_request_count -= 1
314+
self.metadata.handled_request_count -= 1
317315

318316
# Update the cache
319317
cache_key = request.unique_key
@@ -334,7 +332,6 @@ async def reclaim_request(
334332
else:
335333
return processed_request
336334

337-
@override
338335
async def is_empty(self) -> bool:
339336
"""Check if the queue is empty.
340337
@@ -472,7 +469,7 @@ async def _list_head(
472469
# Update the queue head cache
473470
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
474471
# Check if there is another client working with the RequestQueue
475-
self._metadata.had_multiple_clients = response.get('hadMultipleClients', False)
472+
self.metadata.had_multiple_clients = response.get('hadMultipleClients', False)
476473

477474
for request_data in response.get('items', []):
478475
request = Request.model_validate(request_data)

0 commit comments

Comments
 (0)