-
Notifications
You must be signed in to change notification settings - Fork 15
refactor!: Make Actor
initialization stricter and more predictable
#576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
8369ff9
d9137aa
cf1ee6f
d6b85ac
ea8e085
9c3e7b1
a2825bf
0b96454
432c79c
a4a046e
2a52cdc
841d89a
8bd59fd
c4b5d48
19ea5c7
54a3523
c89fd73
b4efbff
6a6ab98
f1ce0d1
8347eb6
b7101a4
19b79f1
b256876
6fbb5f4
5bf51f7
14c5395
f7c9a58
4450bf8
f28fcd7
c2c8ca5
7911c48
e68bdef
04e74bc
1cb295f
4b5946f
70890e7
2d61f1e
3813ea3
5608b4d
4b6c414
79b0ff7
b424c9b
cae107e
177dbb2
698c089
3b7634a
0de39ad
b19ea8c
85bf04d
47f84eb
7d1fc84
35ad0dd
28ebd8d
f884356
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
vdusek marked this conversation as resolved.
Show resolved
Hide resolved
vdusek marked this conversation as resolved.
Show resolved
Hide resolved
|
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
from pydantic import AliasChoices, BeforeValidator, Field, model_validator | ||
from typing_extensions import Self, deprecated | ||
|
||
from crawlee import service_locator | ||
from crawlee._utils.models import timedelta_ms | ||
from crawlee._utils.urls import validate_http_url | ||
from crawlee.configuration import Configuration as CrawleeConfiguration | ||
|
@@ -424,11 +425,35 @@ def disable_browser_sandbox_on_platform(self) -> Self: | |
def get_global_configuration(cls) -> Configuration: | ||
"""Retrieve the global instance of the configuration. | ||
|
||
Mostly for the backwards compatibility. It is recommended to use the `service_locator.get_configuration()` | ||
instead. | ||
This method ensures that ApifyConfigration is returned, even if CrawleeConfiguration was set in the | ||
service locator. | ||
""" | ||
return cls() | ||
global_configuration = service_locator.get_configuration() | ||
|
||
if isinstance(global_configuration, Configuration): | ||
# If Apify configuration was already stored in service locator, return it. | ||
return global_configuration | ||
|
||
# Monkey-patch the base class so that it works with the extended configuration | ||
CrawleeConfiguration.get_global_configuration = Configuration.get_global_configuration # type: ignore[method-assign] | ||
return cls.from_configuration(global_configuration) | ||
|
||
@classmethod | ||
def from_configuration(cls, configuration: CrawleeConfiguration) -> Configuration: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The result should be cached so that two calls to get_global_configuration always return the exact same object. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a warning to the only potential path where this could cause a problem. If someone gets there, it is not by intention, but for the sake of backward compatibility, it will still work in most cases as expected. |
||
"""Create Apify Configuration from existing Crawlee Configuration. | ||
|
||
Args: | ||
configuration: The existing Crawlee Configuration. | ||
|
||
Returns: | ||
The created Apify Configuration. | ||
""" | ||
apify_configuration = cls() | ||
|
||
# Ensure the returned configuration is of type Apify Configuration. | ||
# Most likely crawlee configuration was already set. Create Apify configuration from it. | ||
# Due to known Pydantic issue https://github.com/pydantic/pydantic/issues/9516, creating new instance of | ||
# Configuration from existing one in situation where environment can have some fields set by alias is very | ||
# unpredictable. Use the stable workaround. | ||
for name in configuration.model_fields: | ||
setattr(apify_configuration, name, getattr(configuration, name)) | ||
Pijukatel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return apify_configuration |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,72 +9,71 @@ | |
from ._dataset_client import ApifyDatasetClient | ||
from ._key_value_store_client import ApifyKeyValueStoreClient | ||
from ._request_queue_client import ApifyRequestQueueClient | ||
from apify._configuration import Configuration as ApifyConfiguration | ||
from apify._utils import docs_group | ||
|
||
if TYPE_CHECKING: | ||
from crawlee.configuration import Configuration | ||
from collections.abc import Hashable | ||
|
||
from crawlee.configuration import Configuration as CrawleeConfiguration | ||
|
||
|
||
@docs_group('Storage clients') | ||
class ApifyStorageClient(StorageClient): | ||
"""Apify storage client.""" | ||
|
||
# This class breaches Liskov Substitution Principle. It requires specialized Configuration compared to its parent. | ||
_lsp_violation_error_message_template = ( | ||
'Expected "configuration" to be an instance of "apify.Configuration", but got {} instead.' | ||
) | ||
|
||
@override | ||
def get_additional_cache_key(self, configuration: CrawleeConfiguration) -> Hashable: | ||
if isinstance(configuration, ApifyConfiguration): | ||
return f'{configuration.api_base_url},{configuration.token}' | ||
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__)) | ||
|
||
@override | ||
async def create_dataset_client( | ||
self, | ||
*, | ||
id: str | None = None, | ||
name: str | None = None, | ||
configuration: Configuration | None = None, | ||
alias: str | None = None, | ||
configuration: CrawleeConfiguration | None = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I might just be out of the loop, but wasn't this supposed to work with some "stored" configuration so that we could just remove this parameter? Is there some reason why we need it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried that approach initially, but that led to more problems related to the order in which services can be set in the service locator. Since one service is dependent on another service, but they can be set independently... |
||
) -> ApifyDatasetClient: | ||
# Import here to avoid circular imports. | ||
from apify import Configuration as ApifyConfiguration # noqa: PLC0415 | ||
|
||
configuration = configuration or ApifyConfiguration.get_global_configuration() | ||
if isinstance(configuration, ApifyConfiguration): | ||
return await ApifyDatasetClient.open(id=id, name=name, configuration=configuration) | ||
|
||
raise TypeError( | ||
f'Expected "configuration" to be an instance of "apify.Configuration", ' | ||
f'but got {type(configuration).__name__} instead.' | ||
) | ||
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__)) | ||
|
||
@override | ||
async def create_kvs_client( | ||
self, | ||
*, | ||
id: str | None = None, | ||
name: str | None = None, | ||
configuration: Configuration | None = None, | ||
alias: str | None = None, | ||
configuration: CrawleeConfiguration | None = None, | ||
) -> ApifyKeyValueStoreClient: | ||
# Import here to avoid circular imports. | ||
from apify import Configuration as ApifyConfiguration # noqa: PLC0415 | ||
|
||
configuration = configuration or ApifyConfiguration.get_global_configuration() | ||
if isinstance(configuration, ApifyConfiguration): | ||
return await ApifyKeyValueStoreClient.open(id=id, name=name, configuration=configuration) | ||
|
||
raise TypeError( | ||
f'Expected "configuration" to be an instance of "apify.Configuration", ' | ||
f'but got {type(configuration).__name__} instead.' | ||
) | ||
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__)) | ||
|
||
@override | ||
async def create_rq_client( | ||
self, | ||
*, | ||
id: str | None = None, | ||
name: str | None = None, | ||
configuration: Configuration | None = None, | ||
alias: str | None = None, | ||
configuration: CrawleeConfiguration | None = None, | ||
) -> ApifyRequestQueueClient: | ||
# Import here to avoid circular imports. | ||
from apify import Configuration as ApifyConfiguration # noqa: PLC0415 | ||
|
||
configuration = configuration or ApifyConfiguration.get_global_configuration() | ||
if isinstance(configuration, ApifyConfiguration): | ||
return await ApifyRequestQueueClient.open(id=id, name=name, configuration=configuration) | ||
|
||
raise TypeError( | ||
f'Expected "configuration" to be an instance of "apify.Configuration", ' | ||
f'but got {type(configuration).__name__} instead.' | ||
) | ||
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
# The test fixture will put the Apify SDK wheel path on the next line | ||
APIFY_SDK_WHEEL_PLACEHOLDER | ||
uvicorn[standard] | ||
crawlee[parsel]==1.0.0rc1 | ||
crawlee[parsel] @ git+https://github.com/apify/crawlee-python.git@storage-clients-and-configurations-2 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
from __future__ import annotations | ||
vdusek marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
from typing import TYPE_CHECKING | ||
|
||
if TYPE_CHECKING: | ||
from .conftest import MakeActorFunction, RunActorFunction | ||
|
||
|
||
async def test_migration_through_reboot(make_actor: MakeActorFunction, run_actor: RunActorFunction) -> None: | ||
"""Test that actor works as expected after migration through testing behavior after reboot. | ||
Handle two requests. Migrate in between the two requests.""" | ||
|
||
async def main() -> None: | ||
from crawlee._types import BasicCrawlingContext, ConcurrencySettings | ||
from crawlee.crawlers import BasicCrawler | ||
|
||
from apify import Actor | ||
|
||
async with Actor: | ||
crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=1)) | ||
requests = ['https://example.com/1', 'https://example.com/2'] | ||
|
||
run = await Actor.apify_client.run(Actor.config.actor_run_id or '').get() | ||
assert run | ||
first_run = run.get('stats', {}).get('rebootCount', 0) == 0 | ||
Actor.log.warning(run) | ||
|
||
@crawler.router.default_handler | ||
async def default_handler(context: BasicCrawlingContext) -> None: | ||
context.log.info(f'Processing {context.request.url} ...') | ||
|
||
# Simulate migration through reboot | ||
if context.request.url == requests[1] and first_run: | ||
context.log.info(f'Reclaiming {context.request.url} ...') | ||
rq = await crawler.get_request_manager() | ||
await rq.reclaim_request(context.request) | ||
await Actor.reboot() | ||
|
||
await crawler.run(requests) | ||
|
||
# Each time one request is finished. | ||
assert crawler.statistics.state.requests_finished == 1 | ||
|
||
actor = await make_actor(label='migration', main_func=main) | ||
run_result = await run_actor(actor) | ||
|
||
assert run_result.status == 'SUCCEEDED' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
from unittest import mock | ||
from unittest.mock import AsyncMock | ||
|
||
import pytest | ||
|
||
from crawlee.storages import Dataset, KeyValueStore, RequestQueue | ||
vdusek marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
from crawlee.storages._base import Storage | ||
|
||
from apify import Configuration | ||
from apify.storage_clients import ApifyStorageClient | ||
from apify.storage_clients._apify import ApifyDatasetClient, ApifyKeyValueStoreClient, ApifyRequestQueueClient | ||
|
||
|
||
@pytest.mark.parametrize( | ||
('storage', '_storage_client'), | ||
[ | ||
(Dataset, ApifyDatasetClient), | ||
(KeyValueStore, ApifyKeyValueStoreClient), | ||
(RequestQueue, ApifyRequestQueueClient), | ||
], | ||
) | ||
async def test_get_additional_cache_key( | ||
storage: Storage, _storage_client: ApifyDatasetClient | ApifyKeyValueStoreClient | ApifyRequestQueueClient | ||
) -> None: | ||
"""Test that Storages based on `ApifyStorageClient` include `token` and `api_base_url` in additional cache key.""" | ||
storage_names = iter(['1', '2', '3', '1', '3']) | ||
|
||
apify_storage_client = ApifyStorageClient() | ||
|
||
config_1 = Configuration(token='a') | ||
config_2 = Configuration(token='b') | ||
config_3 = Configuration(token='a', api_base_url='https://super_custom_api.com') | ||
|
||
config_4 = Configuration(token='a') | ||
config_5 = Configuration(token='a', api_base_url='https://super_custom_api.com') | ||
|
||
mocked_open = AsyncMock(spec=_storage_client.open) | ||
mocked_open.get_metadata = AsyncMock(storage_names) | ||
|
||
with mock.patch.object(_storage_client, 'open', mocked_open): | ||
storage_1 = await storage.open(storage_client=apify_storage_client, configuration=config_1) | ||
storage_2 = await storage.open(storage_client=apify_storage_client, configuration=config_2) | ||
storage_3 = await storage.open(storage_client=apify_storage_client, configuration=config_3) | ||
storage_4 = await storage.open(storage_client=apify_storage_client, configuration=config_4) | ||
storage_5 = await storage.open(storage_client=apify_storage_client, configuration=config_5) | ||
|
||
# Different configuration results in different storage clients. | ||
assert storage_1 is not storage_2 | ||
assert storage_1 is not storage_3 | ||
assert storage_2 is not storage_3 | ||
|
||
# Equivalent configuration results in same storage clients. | ||
assert storage_1 is storage_4 | ||
assert storage_3 is storage_5 |
Uh oh!
There was an error while loading. Please reload this page.