-
Notifications
You must be signed in to change notification settings - Fork 487
fix: Fix KeyValueStore.auto_saved_value
failing in some scenarios
#1438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
from collections.abc import Sequence | ||
|
||
from crawlee.configuration import Configuration | ||
from crawlee.storages import KeyValueStore | ||
|
||
logger = getLogger(__name__) | ||
|
||
|
@@ -92,6 +93,7 @@ def __init__( | |
metadata: RequestQueueMetadata, | ||
path_to_rq: Path, | ||
lock: asyncio.Lock, | ||
recoverable_state: RecoverableState[RequestQueueState], | ||
) -> None: | ||
"""Initialize a new instance. | ||
|
||
|
@@ -114,12 +116,7 @@ def __init__( | |
self._is_empty_cache: bool | None = None | ||
"""Cache for is_empty result: None means unknown, True/False is cached state.""" | ||
|
||
self._state = RecoverableState[RequestQueueState]( | ||
default_state=RequestQueueState(), | ||
persist_state_key=f'__RQ_STATE_{self._metadata.id}', | ||
persistence_enabled=True, | ||
logger=logger, | ||
) | ||
self._state = recoverable_state | ||
"""Recoverable state to maintain request ordering, in-progress status, and handled status.""" | ||
|
||
@override | ||
|
@@ -136,6 +133,22 @@ def path_to_metadata(self) -> Path: | |
"""The full path to the request queue metadata file.""" | ||
return self.path_to_rq / METADATA_FILENAME | ||
|
||
@classmethod | ||
async def _create_recoverable_state(cls, id: str, configuration: Configuration) -> RecoverableState: | ||
async def kvs_factory() -> KeyValueStore: | ||
from crawlee.storage_clients import FileSystemStorageClient # noqa: PLC0415 avoid circular import | ||
from crawlee.storages import KeyValueStore # noqa: PLC0415 avoid circular import | ||
|
||
return await KeyValueStore.open(storage_client=FileSystemStorageClient(), configuration=configuration) | ||
|
||
return RecoverableState[RequestQueueState]( | ||
default_state=RequestQueueState(), | ||
persist_state_key=f'__RQ_STATE_{id}', | ||
persist_state_kvs_factory=kvs_factory, | ||
persistence_enabled=True, | ||
logger=logger, | ||
) | ||
|
||
@classmethod | ||
async def open( | ||
cls, | ||
|
@@ -194,6 +207,9 @@ async def open( | |
metadata=metadata, | ||
path_to_rq=rq_base_path / rq_dir, | ||
lock=asyncio.Lock(), | ||
recoverable_state=await cls._create_recoverable_state( | ||
id=id, configuration=configuration | ||
), | ||
) | ||
await client._state.initialize() | ||
await client._discover_existing_requests() | ||
|
@@ -230,6 +246,7 @@ async def open( | |
metadata=metadata, | ||
path_to_rq=path_to_rq, | ||
lock=asyncio.Lock(), | ||
recoverable_state=await cls._create_recoverable_state(id=metadata.id, configuration=configuration), | ||
) | ||
|
||
await client._state.initialize() | ||
|
@@ -254,6 +271,7 @@ async def open( | |
metadata=metadata, | ||
path_to_rq=path_to_rq, | ||
lock=asyncio.Lock(), | ||
recoverable_state=await cls._create_recoverable_state(id=metadata.id, configuration=configuration), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of creating it three times, we can create it once, store it in a variable, and just pass it where needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That creation requires metadata to get the RQ.id, so we have to repeat this call, as in all three branches, we get metadata in a different way. |
||
) | ||
await client._state.initialize() | ||
await client._update_metadata() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
from crawlee.fingerprint_suite._browserforge_adapter import get_available_header_network | ||
from crawlee.http_clients import CurlImpersonateHttpClient, HttpxHttpClient, ImpitHttpClient | ||
from crawlee.proxy_configuration import ProxyInfo | ||
from crawlee.statistics import Statistics | ||
from crawlee.storages import KeyValueStore | ||
from tests.unit.server import TestServer, app, serve_in_thread | ||
|
||
|
@@ -69,6 +70,10 @@ def _prepare_test_env() -> None: | |
# Verify that the test environment was set up correctly. | ||
assert os.environ.get('CRAWLEE_STORAGE_DIR') == str(tmp_path) | ||
|
||
# Reset global class variables to ensure test isolation. | ||
KeyValueStore._autosaved_values = {} | ||
Statistics._Statistics__next_id = 0 # type:ignore[attr-defined] # Mangled attribute | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this contribute towards test isolation? Is there anything that depends on the persist state key that is derived from the ID? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The answer is I am not sure. But we have so many tests, so I think it is best if we restore all we can to the same state at the beginning of the test. This reduces the chance of some weird behavior based on the order of the test execution. |
||
|
||
return _prepare_test_env | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a fresh filesystem storage client to open a request queue feels wrong - at this point, we can be pretty sure that another one already exists. Is there a specific reason to do this or is it just because you don't have access to the existing one?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At that point, we are not sure it exists. It could have been created through a class method without the client and even when created with the help of client, it is out of scope:
await FileSystemRequestQueueClient.open(...)
And why not open the KVS through such a class method as well? Because that way, you bypass the storage instance manager - and that is generally something we do not want.
FileSystemStorageClient is just a helper factory class, which is mainly for convenience and for registering the storage instance manager.