From 9990dcab15ab26d04167bb0a3aa6b6a62564e46f Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 9 Sep 2025 15:40:19 +0200 Subject: [PATCH 1/8] feat: Add support for NDU storages --- src/crawlee/_types.py | 26 +- src/crawlee/crawlers/_basic/_basic_crawler.py | 28 +- .../storage_clients/_base/_storage_client.py | 3 + .../_file_system/_dataset_client.py | 48 +- .../_file_system/_key_value_store_client.py | 50 +- .../_file_system/_request_queue_client.py | 49 +- .../_file_system/_storage_client.py | 9 +- .../_memory/_dataset_client.py | 26 +- .../_memory/_key_value_store_client.py | 26 +- .../_memory/_request_queue_client.py | 26 +- .../_memory/_storage_client.py | 9 +- src/crawlee/storages/_base.py | 4 +- src/crawlee/storages/_dataset.py | 2 + src/crawlee/storages/_key_value_store.py | 2 + src/crawlee/storages/_request_queue.py | 2 + .../storages/_storage_instance_manager.py | 73 +- tests/unit/storages/test_dataset.py | 493 ++++++++++++- tests/unit/storages/test_key_value_store.py | 528 +++++++++++++- tests/unit/storages/test_request_queue.py | 680 +++++++++++++++++- 19 files changed, 2018 insertions(+), 66 deletions(-) diff --git a/src/crawlee/_types.py b/src/crawlee/_types.py index 42ac9412bd..b5ce90491e 100644 --- a/src/crawlee/_types.py +++ b/src/crawlee/_types.py @@ -189,6 +189,7 @@ class PushDataFunctionCall(PushDataKwargs): data: list[dict[str, Any]] | dict[str, Any] dataset_id: str | None dataset_name: str | None + dataset_alias: str | None class KeyValueStoreInterface(Protocol): @@ -255,7 +256,7 @@ def __init__(self, *, key_value_store_getter: GetKeyValueStoreFunction) -> None: self._key_value_store_getter = key_value_store_getter self.add_requests_calls = list[AddRequestsKwargs]() self.push_data_calls = list[PushDataFunctionCall]() - self.key_value_store_changes = dict[tuple[str | None, str | None], KeyValueStoreChangeRecords]() + self.key_value_store_changes = dict[tuple[str | None, str | None, str | None], KeyValueStoreChangeRecords]() async def add_requests( self, @@ -270,6 +271,7 @@ async def push_data( data: list[dict[str, Any]] | dict[str, Any], dataset_id: str | None = None, dataset_name: str | None = None, + dataset_alias: str | None = None, **kwargs: Unpack[PushDataKwargs], ) -> None: """Track a call to the `push_data` context helper.""" @@ -278,6 +280,7 @@ async def push_data( data=data, dataset_id=dataset_id, dataset_name=dataset_name, + dataset_alias=dataset_alias, **kwargs, ) ) @@ -287,13 +290,14 @@ async def get_key_value_store( *, id: str | None = None, name: str | None = None, + alias: str | None = None, ) -> KeyValueStoreInterface: - if (id, name) not in self.key_value_store_changes: - self.key_value_store_changes[id, name] = KeyValueStoreChangeRecords( - await self._key_value_store_getter(id=id, name=name) + if (id, name, alias) not in self.key_value_store_changes: + self.key_value_store_changes[id, name, alias] = KeyValueStoreChangeRecords( + await self._key_value_store_getter(id=id, name=name, alias=alias) ) - return self.key_value_store_changes[id, name] + return self.key_value_store_changes[id, name, alias] @docs_group('Functions') @@ -424,12 +428,14 @@ def __call__( *, id: str | None = None, name: str | None = None, + alias: str | None = None, ) -> Coroutine[None, None, KeyValueStore]: """Call dunder method. Args: id: The ID of the `KeyValueStore` to get. - name: The name of the `KeyValueStore` to get. + name: The name of the `KeyValueStore` to get (global scope). + alias: The alias of the `KeyValueStore` to get (run scope, unnamed). """ @@ -444,12 +450,14 @@ def __call__( *, id: str | None = None, name: str | None = None, + alias: str | None = None, ) -> Coroutine[None, None, KeyValueStoreInterface]: """Call dunder method. Args: id: The ID of the `KeyValueStore` to get. - name: The name of the `KeyValueStore` to get. + name: The name of the `KeyValueStore` to get (global scope). + alias: The alias of the `KeyValueStore` to get (run scope, unnamed). """ @@ -466,6 +474,7 @@ def __call__( data: list[dict[str, Any]] | dict[str, Any], dataset_id: str | None = None, dataset_name: str | None = None, + dataset_alias: str | None = None, **kwargs: Unpack[PushDataKwargs], ) -> Coroutine[None, None, None]: """Call dunder method. @@ -473,7 +482,8 @@ def __call__( Args: data: The data to push to the `Dataset`. dataset_id: The ID of the `Dataset` to push the data to. - dataset_name: The name of the `Dataset` to push the data to. + dataset_name: The name of the `Dataset` to push the data to (global scope). + dataset_alias: The alias of the `Dataset` to push the data to (run scope, unnamed). **kwargs: Additional keyword arguments. """ diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 29498d8c26..e7d983b1fe 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -557,18 +557,20 @@ async def get_dataset( *, id: str | None = None, name: str | None = None, + alias: str | None = None, ) -> Dataset: """Return the `Dataset` with the given ID or name. If none is provided, return the default one.""" - return await Dataset.open(id=id, name=name) + return await Dataset.open(id=id, name=name, alias=alias) async def get_key_value_store( self, *, id: str | None = None, name: str | None = None, + alias: str | None = None, ) -> KeyValueStore: """Return the `KeyValueStore` with the given ID or name. If none is provided, return the default KVS.""" - return await KeyValueStore.open(id=id, name=name) + return await KeyValueStore.open(id=id, name=name, alias=alias) def error_handler( self, handler: ErrorHandler[TCrawlingContext | BasicCrawlingContext] @@ -772,6 +774,7 @@ async def get_data( self, dataset_id: str | None = None, dataset_name: str | None = None, + dataset_alias: str | None = None, **kwargs: Unpack[GetDataKwargs], ) -> DatasetItemsListPage: """Retrieve data from a `Dataset`. @@ -781,13 +784,14 @@ async def get_data( Args: dataset_id: The ID of the `Dataset`. - dataset_name: The name of the `Dataset`. + dataset_name: The name of the `Dataset` (global scope). + dataset_alias: The alias of the `Dataset` (run scope, unnamed). kwargs: Keyword arguments to be passed to the `Dataset.get_data()` method. Returns: The retrieved data. """ - dataset = await Dataset.open(id=dataset_id, name=dataset_name) + dataset = await Dataset.open(id=dataset_id, name=dataset_name, alias=dataset_alias) return await dataset.get_data(**kwargs) async def export_data( @@ -795,6 +799,7 @@ async def export_data( path: str | Path, dataset_id: str | None = None, dataset_name: str | None = None, + dataset_alias: str | None = None, ) -> None: """Export all items from a Dataset to a JSON or CSV file. @@ -805,9 +810,10 @@ async def export_data( Args: path: The destination file path. Must end with '.json' or '.csv'. dataset_id: The ID of the Dataset to export from. If None, uses `name` parameter instead. - dataset_name: The name of the Dataset to export from. If None, uses `id` parameter instead. + dataset_name: The name of the Dataset to export from (global scope). If None, uses `id` parameter instead. + dataset_alias: The alias of the Dataset to export from (run scope, unnamed). """ - dataset = await self.get_dataset(id=dataset_id, name=dataset_name) + dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias) path = path if isinstance(path, Path) else Path(path) dst = path.open('w', newline='') @@ -824,6 +830,7 @@ async def _push_data( data: list[dict[str, Any]] | dict[str, Any], dataset_id: str | None = None, dataset_name: str | None = None, + dataset_alias: str | None = None, **kwargs: Unpack[PushDataKwargs], ) -> None: """Push data to a `Dataset`. @@ -834,10 +841,11 @@ async def _push_data( Args: data: The data to push to the `Dataset`. dataset_id: The ID of the `Dataset`. - dataset_name: The name of the `Dataset`. + dataset_name: The name of the `Dataset` (global scope). + dataset_alias: The alias of the `Dataset` (run scope, unnamed). kwargs: Keyword arguments to be passed to the `Dataset.push_data()` method. """ - dataset = await self.get_dataset(id=dataset_id, name=dataset_name) + dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias) await dataset.push_data(data, **kwargs) def _should_retry_request(self, context: BasicCrawlingContext, error: Exception) -> bool: @@ -1226,8 +1234,8 @@ async def _commit_key_value_store_changes( result: RequestHandlerRunResult, get_kvs: GetKeyValueStoreFromRequestHandlerFunction ) -> None: """Store key value store changes recorded in result.""" - for (id, name), changes in result.key_value_store_changes.items(): - store = await get_kvs(id=id, name=name) + for (id, name, alias), changes in result.key_value_store_changes.items(): + store = await get_kvs(id=id, name=name, alias=alias) for key, value in changes.updates.items(): await store.set_value(key, value.content, value.content_type) diff --git a/src/crawlee/storage_clients/_base/_storage_client.py b/src/crawlee/storage_clients/_base/_storage_client.py index 9be648a0e5..fba8507337 100644 --- a/src/crawlee/storage_clients/_base/_storage_client.py +++ b/src/crawlee/storage_clients/_base/_storage_client.py @@ -34,6 +34,7 @@ async def create_dataset_client( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, ) -> DatasetClient: """Create a dataset client.""" @@ -44,6 +45,7 @@ async def create_kvs_client( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, ) -> KeyValueStoreClient: """Create a key-value store client.""" @@ -54,6 +56,7 @@ async def create_rq_client( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, ) -> RequestQueueClient: """Create a request queue client.""" diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py index a3dae65a34..152bb8a234 100644 --- a/src/crawlee/storage_clients/_file_system/_dataset_client.py +++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py @@ -58,6 +58,7 @@ def __init__( metadata: DatasetMetadata, storage_dir: Path, lock: asyncio.Lock, + directory_name: str | None = None, ) -> None: """Initialize a new instance. @@ -68,6 +69,9 @@ def __init__( self._storage_dir = storage_dir """The base directory where the storage data are being persisted.""" + self._directory_name = directory_name + """The directory name to use for this dataset. If None, uses metadata.name or default.""" + self._lock = lock """A lock to ensure that only one operation is performed at a time.""" @@ -78,6 +82,10 @@ async def get_metadata(self) -> DatasetMetadata: @property def path_to_dataset(self) -> Path: """The full path to the dataset directory.""" + # Use the explicit directory name if provided, otherwise fall back to metadata.name or default + if self._directory_name is not None: + return self._storage_dir / self._STORAGE_SUBDIR / self._directory_name + if self._metadata.name is None: return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT @@ -94,6 +102,7 @@ async def open( *, id: str | None, name: str | None, + alias: str | None = None, configuration: Configuration, ) -> FileSystemDatasetClient: """Open or create a file system dataset client. @@ -104,15 +113,21 @@ async def open( Args: id: The ID of the dataset to open. If provided, searches for existing dataset by ID. - name: The name of the dataset to open. If not provided, uses the default dataset. + name: The name of the dataset for named storages. Mutually exclusive with alias. + alias: The alias of the dataset for unnamed storages. Mutually exclusive with name. configuration: The configuration object containing storage directory settings. Returns: An instance for the opened or created storage client. Raises: - ValueError: If a dataset with the specified ID is not found, or if metadata is invalid. + ValueError: If a dataset with the specified ID is not found, if metadata is invalid, + or if both name and alias are provided. """ + # Validate parameters - exactly one of name or alias should be provided (or neither for default) + if name is not None and alias is not None: + raise ValueError('Cannot specify both name and alias parameters') + storage_dir = Path(configuration.storage_dir) dataset_base_path = storage_dir / cls._STORAGE_SUBDIR @@ -140,6 +155,7 @@ async def open( metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), + directory_name=dataset_dir.name, # Use the actual directory name ) await client._update_metadata(update_accessed_at=True) found = True @@ -152,10 +168,27 @@ async def open( if not found: raise ValueError(f'Dataset with ID "{id}" not found') - # Get a new instance by name. + # Get a new instance by name or alias. else: + # Determine the directory name and metadata name based on whether this is a named or alias storage + if alias is not None: + # For alias storages, use the alias as directory name and set metadata.name to None + # Special case: alias='default' should use the same directory as default storage + directory_name = None if alias == 'default' else alias + actual_name = None + elif name is not None: + # For named storages, use the name as both directory name and metadata.name + directory_name = name + actual_name = name + else: + # For default storage (no name or alias), use None for both - same as alias='default' + directory_name = None + actual_name = None + dataset_path = ( - dataset_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else dataset_base_path / name + dataset_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT + if directory_name is None + else dataset_base_path / directory_name ) metadata_path = dataset_path / METADATA_FILENAME @@ -168,6 +201,9 @@ async def open( await asyncio.to_thread(file.close) try: metadata = DatasetMetadata(**file_content) + # For aliases, ensure the metadata.name is None + if alias is not None: + metadata = metadata.model_copy(update={'name': None}) except ValidationError as exc: raise ValueError(f'Invalid metadata file for dataset "{name}"') from exc @@ -175,6 +211,7 @@ async def open( metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), + directory_name=directory_name, ) await client._update_metadata(update_accessed_at=True) @@ -184,7 +221,7 @@ async def open( now = datetime.now(timezone.utc) metadata = DatasetMetadata( id=crypto_random_object_id(), - name=name, + name=actual_name, # Use actual_name which will be None for aliases created_at=now, accessed_at=now, modified_at=now, @@ -194,6 +231,7 @@ async def open( metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), + directory_name=directory_name, ) await client._update_metadata() diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index bc94980bcc..d8fc21d17b 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -57,6 +57,7 @@ def __init__( metadata: KeyValueStoreMetadata, storage_dir: Path, lock: asyncio.Lock, + directory_name: str | None = None, ) -> None: """Initialize a new instance. @@ -67,6 +68,9 @@ def __init__( self._storage_dir = storage_dir """The base directory where the storage data are being persisted.""" + self._directory_name = directory_name + """The directory name to use for this key-value store. If None, uses metadata.name or default.""" + self._lock = lock """A lock to ensure that only one operation is performed at a time.""" @@ -77,6 +81,10 @@ async def get_metadata(self) -> KeyValueStoreMetadata: @property def path_to_kvs(self) -> Path: """The full path to the key-value store directory.""" + # Use the explicit directory name if provided, otherwise fall back to metadata.name or default + if self._directory_name is not None: + return self._storage_dir / self._STORAGE_SUBDIR / self._directory_name + if self._metadata.name is None: return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT @@ -93,6 +101,7 @@ async def open( *, id: str | None, name: str | None, + alias: str | None = None, configuration: Configuration, ) -> FileSystemKeyValueStoreClient: """Open or create a file system key-value store client. @@ -103,15 +112,21 @@ async def open( Args: id: The ID of the key-value store to open. If provided, searches for existing store by ID. - name: The name of the key-value store to open. If not provided, uses the default store. + name: The name of the key-value store for named storages. Mutually exclusive with alias. + alias: The alias of the key-value store for unnamed storages. Mutually exclusive with name. configuration: The configuration object containing storage directory settings. Returns: An instance for the opened or created storage client. Raises: - ValueError: If a store with the specified ID is not found, or if metadata is invalid. + ValueError: If a store with the specified ID is not found, if metadata is invalid, + or if both name and alias are provided. """ + # Validate parameters - exactly one of name or alias should be provided (or neither for default) + if name is not None and alias is not None: + raise ValueError('Cannot specify both name and alias parameters') + storage_dir = Path(configuration.storage_dir) kvs_base_path = storage_dir / cls._STORAGE_SUBDIR @@ -139,6 +154,7 @@ async def open( metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), + directory_name=kvs_dir.name, # Use the actual directory name ) await client._update_metadata(update_accessed_at=True) found = True @@ -151,9 +167,28 @@ async def open( if not found: raise ValueError(f'Key-value store with ID "{id}" not found.') - # Get a new instance by name. + # Get a new instance by name or alias. else: - kvs_path = kvs_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else kvs_base_path / name + # Determine the directory name and metadata name based on whether this is a named or alias storage + if alias is not None: + # For alias storages, use the alias as directory name and set metadata.name to None + # Special case: alias='default' should use the same directory as default storage + directory_name = None if alias == 'default' else alias + actual_name = None + elif name is not None: + # For named storages, use the name as both directory name and metadata.name + directory_name = name + actual_name = name + else: + # For default storage (no name or alias), use None for both - same as alias='default' + directory_name = None + actual_name = None + + kvs_path = ( + kvs_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT + if directory_name is None + else kvs_base_path / directory_name + ) metadata_path = kvs_path / METADATA_FILENAME # If the key-value store directory exists, reconstruct the client from the metadata file. @@ -165,6 +200,9 @@ async def open( await asyncio.to_thread(file.close) try: metadata = KeyValueStoreMetadata(**file_content) + # For aliases, ensure the metadata.name is None + if alias is not None: + metadata = metadata.model_copy(update={'name': None}) except ValidationError as exc: raise ValueError(f'Invalid metadata file for key-value store "{name}"') from exc @@ -172,6 +210,7 @@ async def open( metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), + directory_name=directory_name, ) await client._update_metadata(update_accessed_at=True) @@ -181,7 +220,7 @@ async def open( now = datetime.now(timezone.utc) metadata = KeyValueStoreMetadata( id=crypto_random_object_id(), - name=name, + name=actual_name, # Use actual_name which will be None for aliases created_at=now, accessed_at=now, modified_at=now, @@ -190,6 +229,7 @@ async def open( metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), + directory_name=directory_name, ) await client._update_metadata() diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index c606af2aa5..73b2656d07 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -91,6 +91,7 @@ def __init__( metadata: RequestQueueMetadata, storage_dir: Path, lock: asyncio.Lock, + directory_name: str | None = None, ) -> None: """Initialize a new instance. @@ -101,6 +102,9 @@ def __init__( self._storage_dir = storage_dir """The base directory where the storage data are being persisted.""" + self._directory_name = directory_name + """The directory name to use for this request queue. If None, uses metadata.name or default.""" + self._lock = lock """A lock to ensure that only one operation is performed at a time.""" @@ -129,6 +133,10 @@ async def get_metadata(self) -> RequestQueueMetadata: @property def path_to_rq(self) -> Path: """The full path to the request queue directory.""" + # Use the explicit directory name if provided, otherwise fall back to metadata.name or default + if self._directory_name is not None: + return self._storage_dir / self._STORAGE_SUBDIR / self._directory_name + if self._metadata.name is None: return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT @@ -145,6 +153,7 @@ async def open( *, id: str | None, name: str | None, + alias: str | None = None, configuration: Configuration, ) -> FileSystemRequestQueueClient: """Open or create a file system request queue client. @@ -155,15 +164,21 @@ async def open( Args: id: The ID of the request queue to open. If provided, searches for existing queue by ID. - name: The name of the request queue to open. If not provided, uses the default queue. + name: The name of the request queue for named storages. Mutually exclusive with alias. + alias: The alias of the request queue for unnamed storages. Mutually exclusive with name. configuration: The configuration object containing storage directory settings. Returns: An instance for the opened or created storage client. Raises: - ValueError: If a queue with the specified ID is not found, or if metadata is invalid. + ValueError: If a queue with the specified ID is not found, if metadata is invalid, + or if both name and alias are provided. """ + # Validate parameters - exactly one of name or alias should be provided (or neither for default) + if name is not None and alias is not None: + raise ValueError('Cannot specify both name and alias parameters') + storage_dir = Path(configuration.storage_dir) rq_base_path = storage_dir / cls._STORAGE_SUBDIR @@ -192,6 +207,7 @@ async def open( metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), + directory_name=rq_dir.name, # Use the actual directory name ) await client._state.initialize() await client._discover_existing_requests() @@ -206,9 +222,28 @@ async def open( if not found: raise ValueError(f'Request queue with ID "{id}" not found') - # Open an existing RQ by its name, or create a new one if not found. + # Open an existing RQ by its name or alias, or create a new one if not found. else: - rq_path = rq_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else rq_base_path / name + # Determine the directory name and metadata name based on whether this is a named or alias storage + if alias is not None: + # For alias storages, use the alias as directory name and set metadata.name to None + # Special case: alias='default' should use the same directory as default storage + directory_name = None if alias == 'default' else alias + actual_name = None + elif name is not None: + # For named storages, use the name as both directory name and metadata.name + directory_name = name + actual_name = name + else: + # For default storage (no name or alias), use None for both - same as alias='default' + directory_name = None + actual_name = None + + rq_path = ( + rq_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT + if directory_name is None + else rq_base_path / directory_name + ) metadata_path = rq_path / METADATA_FILENAME # If the RQ directory exists, reconstruct the client from the metadata file. @@ -223,12 +258,13 @@ async def open( except ValidationError as exc: raise ValueError(f'Invalid metadata file for request queue "{name}"') from exc - metadata.name = name + metadata.name = actual_name # Use actual_name which will be None for aliases client = cls( metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), + directory_name=directory_name, ) await client._state.initialize() @@ -240,7 +276,7 @@ async def open( now = datetime.now(timezone.utc) metadata = RequestQueueMetadata( id=crypto_random_object_id(), - name=name, + name=actual_name, # Use actual_name which will be None for aliases created_at=now, accessed_at=now, modified_at=now, @@ -253,6 +289,7 @@ async def open( metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), + directory_name=directory_name, ) await client._state.initialize() await client._update_metadata() diff --git a/src/crawlee/storage_clients/_file_system/_storage_client.py b/src/crawlee/storage_clients/_file_system/_storage_client.py index 86903ea4e7..94f183db2a 100644 --- a/src/crawlee/storage_clients/_file_system/_storage_client.py +++ b/src/crawlee/storage_clients/_file_system/_storage_client.py @@ -35,10 +35,11 @@ async def create_dataset_client( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, ) -> FileSystemDatasetClient: configuration = configuration or Configuration.get_global_configuration() - client = await FileSystemDatasetClient.open(id=id, name=name, configuration=configuration) + client = await FileSystemDatasetClient.open(id=id, name=name, alias=alias, configuration=configuration) await self._purge_if_needed(client, configuration) return client @@ -48,10 +49,11 @@ async def create_kvs_client( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, ) -> FileSystemKeyValueStoreClient: configuration = configuration or Configuration.get_global_configuration() - client = await FileSystemKeyValueStoreClient.open(id=id, name=name, configuration=configuration) + client = await FileSystemKeyValueStoreClient.open(id=id, name=name, alias=alias, configuration=configuration) await self._purge_if_needed(client, configuration) return client @@ -61,9 +63,10 @@ async def create_rq_client( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, ) -> FileSystemRequestQueueClient: configuration = configuration or Configuration.get_global_configuration() - client = await FileSystemRequestQueueClient.open(id=id, name=name, configuration=configuration) + client = await FileSystemRequestQueueClient.open(id=id, name=name, alias=alias, configuration=configuration) await self._purge_if_needed(client, configuration) return client diff --git a/src/crawlee/storage_clients/_memory/_dataset_client.py b/src/crawlee/storage_clients/_memory/_dataset_client.py index 8426319acf..a28732b884 100644 --- a/src/crawlee/storage_clients/_memory/_dataset_client.py +++ b/src/crawlee/storage_clients/_memory/_dataset_client.py @@ -53,6 +53,7 @@ async def open( *, id: str | None, name: str | None, + alias: str | None = None, ) -> MemoryDatasetClient: """Open or create a new memory dataset client. @@ -62,18 +63,37 @@ async def open( Args: id: The ID of the dataset. If not provided, a random ID will be generated. - name: The name of the dataset. If not provided, the dataset will be unnamed. + name: The name of the dataset for named storages. Mutually exclusive with alias. + alias: The alias of the dataset for unnamed storages. Mutually exclusive with name. Returns: An instance for the opened or created storage client. + + Raises: + ValueError: If both name and alias are provided, or if neither id, name, nor alias is provided. """ - # Otherwise create a new dataset + # Validate parameters - exactly one of name or alias should be provided (or neither for default) + if name is not None and alias is not None: + raise ValueError('Cannot specify both name and alias parameters') + + # Determine the actual name to use in metadata + if alias is not None: + # For alias storages, metadata.name should be None (unnamed storage) + actual_name = None + elif name is not None: + # For named storages, use the provided name + actual_name = name + else: + # For default storage (no name or alias), use None + actual_name = None + + # Create a new dataset dataset_id = id or crypto_random_object_id() now = datetime.now(timezone.utc) metadata = DatasetMetadata( id=dataset_id, - name=name, + name=actual_name, created_at=now, accessed_at=now, modified_at=now, diff --git a/src/crawlee/storage_clients/_memory/_key_value_store_client.py b/src/crawlee/storage_clients/_memory/_key_value_store_client.py index 7dacf6d95d..b936e864a7 100644 --- a/src/crawlee/storage_clients/_memory/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_memory/_key_value_store_client.py @@ -51,6 +51,7 @@ async def open( *, id: str | None, name: str | None, + alias: str | None = None, ) -> MemoryKeyValueStoreClient: """Open or create a new memory key-value store client. @@ -60,18 +61,37 @@ async def open( Args: id: The ID of the key-value store. If not provided, a random ID will be generated. - name: The name of the key-value store. If not provided, the store will be unnamed. + name: The name of the key-value store for named storages. Mutually exclusive with alias. + alias: The alias of the key-value store for unnamed storages. Mutually exclusive with name. Returns: An instance for the opened or created storage client. + + Raises: + ValueError: If both name and alias are provided. """ - # Otherwise create a new key-value store + # Validate parameters - exactly one of name or alias should be provided (or neither for default) + if name is not None and alias is not None: + raise ValueError('Cannot specify both name and alias parameters') + + # Determine the actual name to use in metadata + if alias is not None: + # For alias storages, metadata.name should be None (unnamed storage) + actual_name = None + elif name is not None: + # For named storages, use the provided name + actual_name = name + else: + # For default storage (no name or alias), use None + actual_name = None + + # Create a new key-value store store_id = id or crypto_random_object_id() now = datetime.now(timezone.utc) metadata = KeyValueStoreMetadata( id=store_id, - name=name, + name=actual_name, created_at=now, accessed_at=now, modified_at=now, diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py index 2e48eceba7..ba005029c2 100644 --- a/src/crawlee/storage_clients/_memory/_request_queue_client.py +++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py @@ -63,6 +63,7 @@ async def open( *, id: str | None, name: str | None, + alias: str | None = None, ) -> MemoryRequestQueueClient: """Open or create a new memory request queue client. @@ -72,18 +73,37 @@ async def open( Args: id: The ID of the request queue. If not provided, a random ID will be generated. - name: The name of the request queue. If not provided, the queue will be unnamed. + name: The name of the request queue for named storages. Mutually exclusive with alias. + alias: The alias of the request queue for unnamed storages. Mutually exclusive with name. Returns: An instance for the opened or created storage client. + + Raises: + ValueError: If both name and alias are provided. """ - # Otherwise create a new queue + # Validate parameters - exactly one of name or alias should be provided (or neither for default) + if name is not None and alias is not None: + raise ValueError('Cannot specify both name and alias parameters') + + # Determine the actual name to use in metadata + if alias is not None: + # For alias storages, metadata.name should be None (unnamed storage) + actual_name = None + elif name is not None: + # For named storages, use the provided name + actual_name = name + else: + # For default storage (no name or alias), use None + actual_name = None + + # Create a new queue queue_id = id or crypto_random_object_id() now = datetime.now(timezone.utc) metadata = RequestQueueMetadata( id=queue_id, - name=name, + name=actual_name, created_at=now, accessed_at=now, modified_at=now, diff --git a/src/crawlee/storage_clients/_memory/_storage_client.py b/src/crawlee/storage_clients/_memory/_storage_client.py index f4ac73e489..fa8ff3589e 100644 --- a/src/crawlee/storage_clients/_memory/_storage_client.py +++ b/src/crawlee/storage_clients/_memory/_storage_client.py @@ -33,10 +33,11 @@ async def create_dataset_client( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, ) -> MemoryDatasetClient: configuration = configuration or Configuration.get_global_configuration() - client = await MemoryDatasetClient.open(id=id, name=name) + client = await MemoryDatasetClient.open(id=id, name=name, alias=alias) await self._purge_if_needed(client, configuration) return client @@ -46,10 +47,11 @@ async def create_kvs_client( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, ) -> MemoryKeyValueStoreClient: configuration = configuration or Configuration.get_global_configuration() - client = await MemoryKeyValueStoreClient.open(id=id, name=name) + client = await MemoryKeyValueStoreClient.open(id=id, name=name, alias=alias) await self._purge_if_needed(client, configuration) return client @@ -59,9 +61,10 @@ async def create_rq_client( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, ) -> MemoryRequestQueueClient: configuration = configuration or Configuration.get_global_configuration() - client = await MemoryRequestQueueClient.open(id=id, name=name) + client = await MemoryRequestQueueClient.open(id=id, name=name, alias=alias) await self._purge_if_needed(client, configuration) return client diff --git a/src/crawlee/storages/_base.py b/src/crawlee/storages/_base.py index c63f1dda37..c0c4ee100a 100644 --- a/src/crawlee/storages/_base.py +++ b/src/crawlee/storages/_base.py @@ -36,6 +36,7 @@ async def open( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, storage_client: StorageClient | None = None, ) -> Storage: @@ -43,7 +44,8 @@ async def open( Args: id: The storage ID. - name: The storage name. + name: The storage name (global scope, persists across runs). + alias: The storage alias (run scope, creates unnamed storage). configuration: Configuration object used during the storage creation or restoration process. storage_client: Underlying storage client to use. If not provided, the default global storage client from the service locator will be used. diff --git a/src/crawlee/storages/_dataset.py b/src/crawlee/storages/_dataset.py index 36fbea8a7a..3e4b4c4981 100644 --- a/src/crawlee/storages/_dataset.py +++ b/src/crawlee/storages/_dataset.py @@ -100,6 +100,7 @@ async def open( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, storage_client: StorageClient | None = None, ) -> Dataset: @@ -110,6 +111,7 @@ async def open( cls, id=id, name=name, + alias=alias, configuration=configuration, client_opener=storage_client.create_dataset_client, ) diff --git a/src/crawlee/storages/_key_value_store.py b/src/crawlee/storages/_key_value_store.py index 5297925a37..ee38166f23 100644 --- a/src/crawlee/storages/_key_value_store.py +++ b/src/crawlee/storages/_key_value_store.py @@ -112,6 +112,7 @@ async def open( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, storage_client: StorageClient | None = None, ) -> KeyValueStore: @@ -122,6 +123,7 @@ async def open( cls, id=id, name=name, + alias=alias, configuration=configuration, client_opener=storage_client.create_kvs_client, ) diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index 068b5135f0..0141a52e06 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -118,6 +118,7 @@ async def open( *, id: str | None = None, name: str | None = None, + alias: str | None = None, configuration: Configuration | None = None, storage_client: StorageClient | None = None, ) -> RequestQueue: @@ -128,6 +129,7 @@ async def open( cls, id=id, name=name, + alias=alias, configuration=configuration, client_opener=storage_client.create_rq_client, ) diff --git a/src/crawlee/storages/_storage_instance_manager.py b/src/crawlee/storages/_storage_instance_manager.py index 130a2eec63..f07933f516 100644 --- a/src/crawlee/storages/_storage_instance_manager.py +++ b/src/crawlee/storages/_storage_instance_manager.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import Awaitable, Callable -from typing import TYPE_CHECKING, TypeVar, cast +from typing import TYPE_CHECKING, TypeVar from crawlee.storage_clients._base import DatasetClient, KeyValueStoreClient, RequestQueueClient @@ -26,6 +26,8 @@ class StorageInstanceManager: and provides a unified interface for opening and managing storage instances. """ + _DEFAULT_STORAGE = 'default' + def __init__(self) -> None: self._cache_by_id = dict[type[Storage], dict[str, Storage]]() """Cache for storage instances by ID, separated by storage type.""" @@ -33,6 +35,9 @@ def __init__(self) -> None: self._cache_by_name = dict[type[Storage], dict[str, Storage]]() """Cache for storage instances by name, separated by storage type.""" + self._cache_by_alias = dict[type[Storage], dict[str, Storage]]() + """Cache for storage instances by alias, separated by storage type.""" + self._default_instances = dict[type[Storage], Storage]() """Cache for default instances of each storage type.""" @@ -42,6 +47,7 @@ async def open_storage_instance( *, id: str | None, name: str | None, + alias: str | None, configuration: Configuration, client_opener: ClientOpener, ) -> T: @@ -50,7 +56,8 @@ async def open_storage_instance( Args: cls: The storage class to instantiate. id: Storage ID. - name: Storage name. + name: Storage name (global scope, persists across runs). + alias: Storage alias (run scope, creates unnamed storage). configuration: Configuration object. client_opener: Function to create the storage client. @@ -58,14 +65,18 @@ async def open_storage_instance( The storage instance. Raises: - ValueError: If both id and name are specified. + ValueError: If multiple parameters out of `id`, `name`, and `alias` are specified. """ - if id and name: - raise ValueError('Only one of "id" or "name" can be specified, not both.') + # Validate parameters + specified_params = sum(1 for param in [id, name, alias] if param is not None) + if specified_params > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - # Check for default instance - if id is None and name is None and cls in self._default_instances: - return cast('T', self._default_instances[cls]) + # Auto-set alias='default' when no parameters are specified. + # Default unnamed storage is equal to alias=default unnamed storage. + if specified_params == 0: + alias = self._DEFAULT_STORAGE + specified_params = 1 # Check cache if id is not None: @@ -82,8 +93,38 @@ async def open_storage_instance( if isinstance(cached_instance, cls): return cached_instance + if alias is not None: + type_cache_by_alias = self._cache_by_alias.get(cls, {}) + if alias in type_cache_by_alias: + cached_instance = type_cache_by_alias[alias] + if isinstance(cached_instance, cls): + return cached_instance + + # Check for conflicts between named and alias storages + if name is not None: + # Check if there's already an alias storage with the same identifier + type_cache_by_alias = self._cache_by_alias.get(cls, {}) + if name in type_cache_by_alias: + raise ValueError( + f'Cannot create named storage "{name}" because an alias storage with the same name already exists. ' + f'Use a different name or drop the existing alias storage first.' + ) + + if alias is not None: + # Check if there's already a named storage with the same identifier + type_cache_by_name = self._cache_by_name.get(cls, {}) + if alias in type_cache_by_name: + raise ValueError( + f'Cannot create alias storage "{alias}" because a named storage with the same name already exists. ' + f'Use a different alias or drop the existing named storage first.' + ) + # Create new instance - client = await client_opener(id=id, name=name, configuration=configuration) + # Pass the correct parameters to the storage client + if alias is not None: + client = await client_opener(id=id, name=None, alias=alias, configuration=configuration) + else: + client = await client_opener(id=id, name=name, configuration=configuration) metadata = await client.get_metadata() instance = cls(client, metadata.id, metadata.name) # type: ignore[call-arg] @@ -92,14 +133,13 @@ async def open_storage_instance( # Cache the instance type_cache_by_id = self._cache_by_id.setdefault(cls, {}) type_cache_by_name = self._cache_by_name.setdefault(cls, {}) + type_cache_by_alias = self._cache_by_alias.setdefault(cls, {}) type_cache_by_id[instance.id] = instance if instance_name is not None: type_cache_by_name[instance_name] = instance - - # Set as default if no id/name specified - if id is None and name is None: - self._default_instances[cls] = instance + if alias is not None: + type_cache_by_alias[alias] = instance return instance @@ -122,6 +162,12 @@ def remove_from_cache(self, storage_instance: Storage) -> None: if storage_instance.name in type_cache_by_name: del type_cache_by_name[storage_instance.name] + # Remove from alias cache - need to search by instance since alias is not stored on the instance + type_cache_by_alias = self._cache_by_alias.get(storage_type, {}) + aliases_to_remove = [alias for alias, instance in type_cache_by_alias.items() if instance is storage_instance] + for alias in aliases_to_remove: + del type_cache_by_alias[alias] + # Remove from default instances if storage_type in self._default_instances and self._default_instances[storage_type] is storage_instance: del self._default_instances[storage_type] @@ -130,4 +176,5 @@ def clear_cache(self) -> None: """Clear all cached storage instances.""" self._cache_by_id.clear() self._cache_by_name.clear() + self._cache_by_alias.clear() self._default_instances.clear() diff --git a/tests/unit/storages/test_dataset.py b/tests/unit/storages/test_dataset.py index a57d525d56..3858e796ff 100644 --- a/tests/unit/storages/test_dataset.py +++ b/tests/unit/storages/test_dataset.py @@ -7,6 +7,7 @@ import pytest +from crawlee import service_locator from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient from crawlee.storages import Dataset, KeyValueStore @@ -178,7 +179,7 @@ async def test_open_with_id_and_name( configuration: Configuration, ) -> None: """Test that open() raises an error when both id and name are provided.""" - with pytest.raises(ValueError, match=r'Only one of "id" or "name" can be specified'): + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): await Dataset.open( id='some-id', name='some-name', @@ -582,3 +583,493 @@ async def test_purge( # Clean up await dataset.drop() + + +async def test_open_with_alias( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test opening datasets with alias parameter for NDU functionality.""" + # Create datasets with different aliases + dataset_1 = await Dataset.open( + alias='test_alias_1', + storage_client=storage_client, + configuration=configuration, + ) + dataset_2 = await Dataset.open( + alias='test_alias_2', + storage_client=storage_client, + configuration=configuration, + ) + + # Verify they have different IDs but no names (unnamed) + assert dataset_1.id != dataset_2.id + assert dataset_1.name is None + assert dataset_2.name is None + + # Add different data to each + await dataset_1.push_data({'source': 'alias_1', 'value': 1}) + await dataset_2.push_data({'source': 'alias_2', 'value': 2}) + + # Verify data isolation + data_1 = await dataset_1.get_data() + data_2 = await dataset_2.get_data() + + assert data_1.count == 1 + assert data_2.count == 1 + assert data_1.items[0]['source'] == 'alias_1' + assert data_2.items[0]['source'] == 'alias_2' + + # Clean up + await dataset_1.drop() + await dataset_2.drop() + + +async def test_alias_caching( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that datasets with same alias return same instance (cached).""" + # Open dataset with alias + dataset_1 = await Dataset.open( + alias='cache_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Open again with same alias + dataset_2 = await Dataset.open( + alias='cache_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Should be same instance + assert dataset_1 is dataset_2 + assert dataset_1.id == dataset_2.id + + # Clean up + await dataset_1.drop() + + +async def test_alias_with_id_error( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that providing both alias and id raises error.""" + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): + await Dataset.open( + id='some-id', + alias='some-alias', + storage_client=storage_client, + configuration=configuration, + ) + + +async def test_alias_with_name_error( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that providing both alias and name raises error.""" + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): + await Dataset.open( + name='some-name', + alias='some-alias', + storage_client=storage_client, + configuration=configuration, + ) + + +async def test_alias_with_all_parameters_error( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that providing id, name, and alias raises error.""" + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): + await Dataset.open( + id='some-id', + name='some-name', + alias='some-alias', + storage_client=storage_client, + configuration=configuration, + ) + + +async def test_alias_with_special_characters( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test alias functionality with special characters.""" + special_aliases = [ + 'alias-with-dashes', + 'alias_with_underscores', + 'alias.with.dots', + 'alias123with456numbers', + 'CamelCaseAlias', + ] + + datasets = [] + for alias in special_aliases: + dataset = await Dataset.open( + alias=alias, + storage_client=storage_client, + configuration=configuration, + ) + datasets.append(dataset) + + # Add data with the alias as identifier + await dataset.push_data({'alias_used': alias, 'test': 'special_chars'}) + + # Verify all work correctly + for i, dataset in enumerate(datasets): + data = await dataset.get_data() + assert data.count == 1 + assert data.items[0]['alias_used'] == special_aliases[i] + + # Clean up + for dataset in datasets: + await dataset.drop() + + +async def test_named_vs_alias_conflict_detection() -> None: + """Test that conflicts between named and alias storages are detected.""" + # Test 1: Create named storage first, then try alias with same name + named_dataset = await Dataset.open(name='conflict_test') + assert named_dataset.name == 'conflict_test' + + # Try to create alias with same name - should raise error + with pytest.raises(ValueError, match=r'Cannot create alias storage "conflict_test".*already exists'): + await Dataset.open(alias='conflict_test') + + # Clean up + await named_dataset.drop() + + # Test 2: Create alias first, then try named with same name + alias_dataset = await Dataset.open(alias='conflict_test2') + assert alias_dataset.name is None # Alias storages have no name + + # Try to create named with same name - should raise error + with pytest.raises(ValueError, match=r'Cannot create named storage "conflict_test2".*already exists'): + await Dataset.open(name='conflict_test2') + + # Clean up + await alias_dataset.drop() + + +async def test_alias_parameter( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test dataset creation and operations with alias parameter.""" + # Create dataset with alias + alias_dataset = await Dataset.open( + alias='test_alias', + storage_client=storage_client, + configuration=configuration, + ) + + # Verify alias dataset properties + assert alias_dataset.id is not None + assert alias_dataset.name is None # Alias storages should be unnamed + + # Test data operations + await alias_dataset.push_data({'type': 'alias', 'value': 1}) + data = await alias_dataset.get_data() + assert data.count == 1 + assert data.items[0]['type'] == 'alias' + + await alias_dataset.drop() + + +async def test_alias_vs_named_isolation( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that alias and named datasets with same identifier are isolated.""" + # Create named dataset + named_dataset = await Dataset.open( + name='test_identifier', + storage_client=storage_client, + configuration=configuration, + ) + + # Verify named dataset + assert named_dataset.name == 'test_identifier' + await named_dataset.push_data({'type': 'named'}) + + # Clean up named dataset first + await named_dataset.drop() + + # Now create alias dataset with same identifier (should work after cleanup) + alias_dataset = await Dataset.open( + alias='test_identifier', + storage_client=storage_client, + configuration=configuration, + ) + + # Should be different instance + assert alias_dataset.name is None + await alias_dataset.push_data({'type': 'alias'}) + + # Verify alias data + alias_data = await alias_dataset.get_data() + assert alias_data.items[0]['type'] == 'alias' + + await alias_dataset.drop() + + +async def test_default_vs_alias_default_equivalence( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that default dataset and alias='default' are equivalent.""" + # Open default dataset + default_dataset = await Dataset.open( + storage_client=storage_client, + configuration=configuration, + ) + + # Open alias='default' dataset + alias_default_dataset = await Dataset.open( + alias='default', + storage_client=storage_client, + configuration=configuration, + ) + + # Should be the same + assert default_dataset.id == alias_default_dataset.id + assert default_dataset.name is None + assert alias_default_dataset.name is None + + # Data should be shared + await default_dataset.push_data({'source': 'default'}) + data = await alias_default_dataset.get_data() + assert data.items[0]['source'] == 'default' + + await default_dataset.drop() + + +async def test_alias_parameter_validation( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test alias parameter validation.""" + # Should not allow both name and alias + with pytest.raises(ValueError, match=r'Only one of'): + await Dataset.open( + name='test', + alias='test', + storage_client=storage_client, + configuration=configuration, + ) + + # Valid alias should work + alias_dataset = await Dataset.open( + alias='valid_alias', + storage_client=storage_client, + configuration=configuration, + ) + assert alias_dataset.name is None + await alias_dataset.drop() + + +async def test_multiple_alias_isolation( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that different aliases create separate datasets.""" + datasets = [] + + for i in range(3): + dataset = await Dataset.open( + alias=f'alias_{i}', + storage_client=storage_client, + configuration=configuration, + ) + await dataset.push_data({'alias': f'alias_{i}', 'index': i}) + datasets.append(dataset) + + # All should be different + for i in range(3): + for j in range(i + 1, 3): + assert datasets[i].id != datasets[j].id + + # Verify data isolation + for i, dataset in enumerate(datasets): + data = await dataset.get_data() + assert data.items[0]['alias'] == f'alias_{i}' + await dataset.drop() + + +async def test_purge_on_start_enabled(storage_client: StorageClient) -> None: + """Test purge behavior when purge_on_start=True: named storages retain data, unnamed storages are purged.""" + + # Skip this test for memory storage since it doesn't persist data between client instances. + if storage_client.__class__ == MemoryStorageClient: + pytest.skip('Memory storage does not persist data between client instances.') + + configuration = Configuration(purge_on_start=True) + + # First, create all storage types with purge enabled and add data. + default_dataset = await Dataset.open( + storage_client=storage_client, + configuration=configuration, + ) + + alias_dataset = await Dataset.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + + named_dataset = await Dataset.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + await default_dataset.push_data({'type': 'default', 'data': 'should_be_purged'}) + await alias_dataset.push_data({'type': 'alias', 'data': 'should_be_purged'}) + await named_dataset.push_data({'type': 'named', 'data': 'should_persist'}) + + # Verify data was added + default_data = await default_dataset.get_data() + alias_data = await alias_dataset.get_data() + named_data = await named_dataset.get_data() + + assert len(default_data.items) == 1 + assert len(alias_data.items) == 1 + assert len(named_data.items) == 1 + + # Verify that default and alias storages are unnamed + default_metadata = await default_dataset.get_metadata() + alias_metadata = await alias_dataset.get_metadata() + named_metadata = await named_dataset.get_metadata() + + assert default_metadata.name is None + assert alias_metadata.name is None + assert named_metadata.name == 'purge_test_named' + + # Clear storage cache to simulate "reopening" storages + service_locator.storage_instance_manager.clear_cache() + + # Now "reopen" all storages + default_dataset_2 = await Dataset.open( + storage_client=storage_client, + configuration=configuration, + ) + alias_dataset_2 = await Dataset.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + named_dataset_2 = await Dataset.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + # Check the data after purge + default_data_after = await default_dataset_2.get_data() + alias_data_after = await alias_dataset_2.get_data() + named_data_after = await named_dataset_2.get_data() + + # Unnamed storages (alias and default) should be purged (data removed) + assert len(default_data_after.items) == 0 + assert len(alias_data_after.items) == 0 + + # Named storage should retain data (not purged) + assert len(named_data_after.items) == 1 + + # Clean up + await named_dataset_2.drop() + await alias_dataset_2.drop() + await default_dataset_2.drop() + + +async def test_purge_on_start_disabled(storage_client: StorageClient) -> None: + """Test purge behavior when purge_on_start=False: all storages retain data regardless of type.""" + + # Skip this test for memory storage since it doesn't persist data between client instances. + if storage_client.__class__ == MemoryStorageClient: + pytest.skip('Memory storage does not persist data between client instances.') + + configuration = Configuration(purge_on_start=False) + + # First, create all storage types with purge disabled and add data. + default_dataset = await Dataset.open( + storage_client=storage_client, + configuration=configuration, + ) + + alias_dataset = await Dataset.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + + named_dataset = await Dataset.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + await default_dataset.push_data({'type': 'default', 'data': 'should_persist'}) + await alias_dataset.push_data({'type': 'alias', 'data': 'should_persist'}) + await named_dataset.push_data({'type': 'named', 'data': 'should_persist'}) + + # Verify data was added + default_data = await default_dataset.get_data() + alias_data = await alias_dataset.get_data() + named_data = await named_dataset.get_data() + + assert len(default_data.items) == 1 + assert len(alias_data.items) == 1 + assert len(named_data.items) == 1 + + # Verify that default and alias storages are unnamed + default_metadata = await default_dataset.get_metadata() + alias_metadata = await alias_dataset.get_metadata() + named_metadata = await named_dataset.get_metadata() + + assert default_metadata.name is None + assert alias_metadata.name is None + assert named_metadata.name == 'purge_test_named' + + # Clear storage cache to simulate "reopening" storages + service_locator.storage_instance_manager.clear_cache() + + # Now "reopen" all storages + default_dataset_2 = await Dataset.open( + storage_client=storage_client, + configuration=configuration, + ) + alias_dataset_2 = await Dataset.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + named_dataset_2 = await Dataset.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + # Check the data after purge + default_data_after = await default_dataset_2.get_data() + alias_data_after = await alias_dataset_2.get_data() + named_data_after = await named_dataset_2.get_data() + + # All storages should retain data (not purged) + assert len(default_data_after.items) == 1 + assert len(alias_data_after.items) == 1 + assert len(named_data_after.items) == 1 + + assert default_data_after.items[0]['data'] == 'should_persist' + assert alias_data_after.items[0]['data'] == 'should_persist' + assert named_data_after.items[0]['data'] == 'should_persist' + + # Clean up + await default_dataset_2.drop() + await alias_dataset_2.drop() + await named_dataset_2.drop() diff --git a/tests/unit/storages/test_key_value_store.py b/tests/unit/storages/test_key_value_store.py index 122de8b157..00d9db5b05 100644 --- a/tests/unit/storages/test_key_value_store.py +++ b/tests/unit/storages/test_key_value_store.py @@ -8,6 +8,7 @@ import pytest +from crawlee import service_locator from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient from crawlee.storages import KeyValueStore @@ -94,7 +95,7 @@ async def test_open_with_id_and_name( configuration: Configuration, ) -> None: """Test that open() raises an error when both id and name are provided.""" - with pytest.raises(ValueError, match=r'Only one of "id" or "name" can be specified'): + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): await KeyValueStore.open( id='some-id', name='some-name', @@ -598,3 +599,528 @@ async def test_record_exists_after_purge(kvs: KeyValueStore) -> None: # Should no longer exist assert await kvs.record_exists('key1') is False assert await kvs.record_exists('key2') is False + + +async def test_open_with_alias( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test opening key-value stores with alias parameter for NDU functionality.""" + # Create key-value stores with different aliases + kvs_1 = await KeyValueStore.open( + alias='test_alias_1', + storage_client=storage_client, + configuration=configuration, + ) + kvs_2 = await KeyValueStore.open( + alias='test_alias_2', + storage_client=storage_client, + configuration=configuration, + ) + + # Verify they have different IDs but no names (unnamed) + assert kvs_1.id != kvs_2.id + assert kvs_1.name is None + assert kvs_2.name is None + + # Add different data to each + await kvs_1.set_value('source', 'alias_1') + await kvs_2.set_value('source', 'alias_2') + + # Verify data isolation + value_1 = await kvs_1.get_value('source') + value_2 = await kvs_2.get_value('source') + + assert value_1 == 'alias_1' + assert value_2 == 'alias_2' + + # Clean up + await kvs_1.drop() + await kvs_2.drop() + + +async def test_alias_caching( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that key-value stores with same alias return same instance (cached).""" + # Open kvs with alias + kvs_1 = await KeyValueStore.open( + alias='cache_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Open again with same alias + kvs_2 = await KeyValueStore.open( + alias='cache_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Should be same instance + assert kvs_1 is kvs_2 + assert kvs_1.id == kvs_2.id + + # Clean up + await kvs_1.drop() + + +async def test_alias_with_id_error( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that providing both alias and id raises error.""" + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): + await KeyValueStore.open( + id='some-id', + alias='some-alias', + storage_client=storage_client, + configuration=configuration, + ) + + +async def test_alias_with_name_error( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that providing both alias and name raises error.""" + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): + await KeyValueStore.open( + name='some-name', + alias='some-alias', + storage_client=storage_client, + configuration=configuration, + ) + + +async def test_alias_with_special_characters( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test alias functionality with special characters.""" + special_aliases = [ + 'alias-with-dashes', + 'alias_with_underscores', + 'alias.with.dots', + 'alias123with456numbers', + 'CamelCaseAlias', + ] + + stores = [] + for alias in special_aliases: + kvs = await KeyValueStore.open( + alias=alias, + storage_client=storage_client, + configuration=configuration, + ) + stores.append(kvs) + + # Add data with the alias as identifier + await kvs.set_value('alias_used', alias) + await kvs.set_value('test', 'special_chars') + + # Verify all work correctly + for i, kvs in enumerate(stores): + assert await kvs.get_value('alias_used') == special_aliases[i] + assert await kvs.get_value('test') == 'special_chars' + + # Clean up + for kvs in stores: + await kvs.drop() + + +async def test_alias_key_operations( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that key operations work correctly with alias stores.""" + kvs = await KeyValueStore.open( + alias='key_ops_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Test setting multiple keys + test_data = { + 'key1': {'data': 'value1', 'number': 1}, + 'key2': 'simple string value', + 'key3': [1, 2, 3, 4, 5], + 'key4': None, + } + + for key, value in test_data.items(): + await kvs.set_value(key, value) + + # Test getting all keys + keys = await kvs.list_keys() + key_names = [k.key for k in keys] + assert len(keys) == 4 + for key in test_data: + assert key in key_names + + # Test record_exists + for key in test_data: + assert await kvs.record_exists(key) is True + assert await kvs.record_exists('nonexistent') is False + + # Test iteration + collected_keys = [key async for key in kvs.iterate_keys()] + assert len(collected_keys) == 4 + + # Test deletion + await kvs.delete_value('key2') + assert await kvs.record_exists('key2') is False + assert await kvs.get_value('key2') is None + + # Verify other keys still exist + remaining_keys = await kvs.list_keys() + assert len(remaining_keys) == 3 + + # Clean up + await kvs.drop() + + +async def test_named_vs_alias_conflict_detection() -> None: + """Test that conflicts between named and alias storages are detected.""" + # Test 1: Create named storage first, then try alias with same name + named_kvs = await KeyValueStore.open(name='conflict_test') + assert named_kvs.name == 'conflict_test' + + # Try to create alias with same name - should raise error + with pytest.raises(ValueError, match=r'Cannot create alias storage "conflict_test".*already exists'): + await KeyValueStore.open(alias='conflict_test') + + # Clean up + await named_kvs.drop() + + # Test 2: Create alias first, then try named with same name + alias_kvs = await KeyValueStore.open(alias='conflict_test2') + assert alias_kvs.name is None # Alias storages have no name + + # Try to create named with same name - should raise error + with pytest.raises(ValueError, match=r'Cannot create named storage "conflict_test2".*already exists'): + await KeyValueStore.open(name='conflict_test2') + + # Clean up + await alias_kvs.drop() + + # Test 3: Different names should work fine + named_kvs_ok = await KeyValueStore.open(name='different_name') + alias_kvs_ok = await KeyValueStore.open(alias='different_alias') + + assert named_kvs_ok.name == 'different_name' + assert alias_kvs_ok.name is None + + # Clean up + await named_kvs_ok.drop() + await alias_kvs_ok.drop() + + +async def test_alias_parameter( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test key-value store creation and operations with alias parameter.""" + # Create kvs with alias + alias_kvs = await KeyValueStore.open( + alias='test_alias', + storage_client=storage_client, + configuration=configuration, + ) + + # Verify alias kvs properties + assert alias_kvs.id is not None + assert alias_kvs.name is None # Alias storages should be unnamed + + # Test data operations + await alias_kvs.set_value('test_key', {'type': 'alias', 'value': 1}) + value = await alias_kvs.get_value('test_key') + assert value['type'] == 'alias' + + await alias_kvs.drop() + + +async def test_alias_vs_named_isolation( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that alias and named key-value stores with same identifier are isolated.""" + # Create named kvs + named_kvs = await KeyValueStore.open( + name='test_identifier', + storage_client=storage_client, + configuration=configuration, + ) + + # Verify named kvs + assert named_kvs.name == 'test_identifier' + await named_kvs.set_value('type', 'named') + + # Clean up named kvs first + await named_kvs.drop() + + # Now create alias kvs with same identifier (should work after cleanup) + alias_kvs = await KeyValueStore.open( + alias='test_identifier', + storage_client=storage_client, + configuration=configuration, + ) + + # Should be different instance + assert alias_kvs.name is None + await alias_kvs.set_value('type', 'alias') + + # Verify alias data + alias_value = await alias_kvs.get_value('type') + assert alias_value == 'alias' + + await alias_kvs.drop() + + +async def test_default_vs_alias_default_equivalence( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that default key-value store and alias='default' are equivalent.""" + # Open default kvs + default_kvs = await KeyValueStore.open( + storage_client=storage_client, + configuration=configuration, + ) + + # Open alias='default' kvs + alias_default_kvs = await KeyValueStore.open( + alias='default', + storage_client=storage_client, + configuration=configuration, + ) + + # Should be the same + assert default_kvs.id == alias_default_kvs.id + assert default_kvs.name is None + assert alias_default_kvs.name is None + + # Data should be shared + await default_kvs.set_value('source', 'default') + value = await alias_default_kvs.get_value('source') + assert value == 'default' + + await default_kvs.drop() + + +async def test_alias_parameter_validation( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test alias parameter validation.""" + # Should not allow both name and alias + with pytest.raises(ValueError, match=r'Only one of'): + await KeyValueStore.open( + name='test', + alias='test', + storage_client=storage_client, + configuration=configuration, + ) + + # Valid alias should work + alias_kvs = await KeyValueStore.open( + alias='valid_alias', + storage_client=storage_client, + configuration=configuration, + ) + assert alias_kvs.name is None + await alias_kvs.drop() + + +async def test_multiple_alias_isolation( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that different aliases create separate key-value stores.""" + kvs_stores = [] + + for i in range(3): + kvs = await KeyValueStore.open( + alias=f'alias_{i}', + storage_client=storage_client, + configuration=configuration, + ) + await kvs.set_value('alias', f'alias_{i}') + await kvs.set_value('index', i) + kvs_stores.append(kvs) + + # All should be different + for i in range(3): + for j in range(i + 1, 3): + assert kvs_stores[i].id != kvs_stores[j].id + + # Verify data isolation + for i, kvs in enumerate(kvs_stores): + alias_value = await kvs.get_value('alias') + index_value = await kvs.get_value('index') + assert alias_value == f'alias_{i}' + # For memory storage, value is preserved as int; for filesystem it's converted to string + assert index_value == i or index_value == str(i) + await kvs.drop() + + +async def test_purge_on_start_enabled(storage_client: StorageClient) -> None: + """Test purge behavior when purge_on_start=True: named storages retain data, unnamed storages are purged.""" + + # Skip this test for memory storage since it doesn't persist data between client instances. + if storage_client.__class__.__name__ == 'MemoryStorageClient': + pytest.skip('Memory storage does not persist data between client instances.') + + configuration = Configuration(purge_on_start=True) + + # First, create all storage types with purge enabled and add data. + default_kvs = await KeyValueStore.open( + storage_client=storage_client, + configuration=configuration, + ) + + alias_kvs = await KeyValueStore.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + + named_kvs = await KeyValueStore.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + await default_kvs.set_value(key='data', value='should_be_purged') + await alias_kvs.set_value(key='data', value='should_be_purged') + await named_kvs.set_value(key='data', value='should_persist') + + # Verify data was added + default_data = await default_kvs.get_value(key='data') + alias_data = await alias_kvs.get_value(key='data') + named_data = await named_kvs.get_value(key='data') + + assert default_data == 'should_be_purged' + assert alias_data == 'should_be_purged' + assert named_data == 'should_persist' + + # Verify that default and alias storages are unnamed + default_metadata = await default_kvs.get_metadata() + alias_metadata = await alias_kvs.get_metadata() + named_metadata = await named_kvs.get_metadata() + + assert default_metadata.name is None + assert alias_metadata.name is None + assert named_metadata.name == 'purge_test_named' + + # Clear storage cache to simulate "reopening" storages + service_locator.storage_instance_manager.clear_cache() + + # Now "reopen" all storages + default_kvs_2 = await KeyValueStore.open( + storage_client=storage_client, + configuration=configuration, + ) + alias_kvs_2 = await KeyValueStore.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + named_kvs_2 = await KeyValueStore.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + # Check the data after purge + default_data_after = await default_kvs_2.get_value(key='data') + alias_data_after = await alias_kvs_2.get_value(key='data') + named_data_after = await named_kvs_2.get_value(key='data') + + # Unnamed storages (alias and default) should be purged (data removed) + assert default_data_after is None + assert alias_data_after is None + + # Named storage should retain data (not purged) + assert named_data_after == 'should_persist' + + # Clean up + await named_kvs_2.drop() + await alias_kvs_2.drop() + await default_kvs_2.drop() + + +async def test_purge_on_start_disabled(storage_client: StorageClient) -> None: + """Test purge behavior when purge_on_start=False: all storages retain data regardless of type.""" + + # Skip this test for memory storage since it doesn't persist data between client instances. + if storage_client.__class__.__name__ == 'MemoryStorageClient': + pytest.skip('Memory storage does not persist data between client instances.') + + configuration = Configuration(purge_on_start=False) + + # First, create all storage types with purge disabled and add data. + default_kvs = await KeyValueStore.open( + storage_client=storage_client, + configuration=configuration, + ) + + alias_kvs = await KeyValueStore.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + + named_kvs = await KeyValueStore.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + await default_kvs.set_value('data', 'should_persist') + await alias_kvs.set_value('data', 'should_persist') + await named_kvs.set_value('data', 'should_persist') + + # Verify data was added + default_data = await default_kvs.get_value('data') + alias_data = await alias_kvs.get_value('data') + named_data = await named_kvs.get_value('data') + + assert default_data == 'should_persist' + assert alias_data == 'should_persist' + assert named_data == 'should_persist' + + # Clear storage cache to simulate "reopening" storages + service_locator.storage_instance_manager.clear_cache() + + # Now "reopen" all storages + default_kvs_2 = await KeyValueStore.open( + storage_client=storage_client, + configuration=configuration, + ) + alias_kvs_2 = await KeyValueStore.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + named_kvs_2 = await KeyValueStore.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + # Check the data after reopen + default_data_after = await default_kvs_2.get_value('data') + alias_data_after = await alias_kvs_2.get_value('data') + named_data_after = await named_kvs_2.get_value('data') + + # All storages should retain data when purge is disabled + assert default_data_after == 'should_persist' + assert alias_data_after == 'should_persist' + assert named_data_after == 'should_persist' + + # Clean up + await named_kvs_2.drop() + await alias_kvs_2.drop() + await default_kvs_2.drop() diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 135e732b33..ed733a866c 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -99,7 +99,7 @@ async def test_open_with_id_and_name( configuration: Configuration, ) -> None: """Test that open() raises an error when both id and name are provided.""" - with pytest.raises(ValueError, match=r'Only one of "id" or "name" can be specified'): + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): await RequestQueue.open( id='some-id', name='some-name', @@ -642,3 +642,681 @@ async def test_purge( # Clean up await rq.drop() + + +async def test_open_with_alias( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test opening request queues with alias parameter for NDU functionality.""" + # Create request queues with different aliases + rq_1 = await RequestQueue.open( + alias='test_alias_1', + storage_client=storage_client, + configuration=configuration, + ) + rq_2 = await RequestQueue.open( + alias='test_alias_2', + storage_client=storage_client, + configuration=configuration, + ) + + # Verify they have different IDs but no names (unnamed) + assert rq_1.id != rq_2.id + assert rq_1.name is None + assert rq_2.name is None + + # Add different requests to each + await rq_1.add_request('https://example1.com') + await rq_2.add_request('https://example2.com') + + # Verify data isolation + request_1 = await rq_1.fetch_next_request() + request_2 = await rq_2.fetch_next_request() + + assert request_1 is not None + assert request_2 is not None + assert request_1.url == 'https://example1.com' + assert request_2.url == 'https://example2.com' + + # Clean up + await rq_1.drop() + await rq_2.drop() + + +async def test_alias_caching( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that request queues with same alias return same instance (cached).""" + # Open rq with alias + rq_1 = await RequestQueue.open( + alias='cache_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Open again with same alias + rq_2 = await RequestQueue.open( + alias='cache_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Should be same instance + assert rq_1 is rq_2 + assert rq_1.id == rq_2.id + + # Clean up + await rq_1.drop() + + +async def test_alias_with_id_error( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that providing both alias and id raises error.""" + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): + await RequestQueue.open( + id='some-id', + alias='some-alias', + storage_client=storage_client, + configuration=configuration, + ) + + +async def test_alias_with_name_error( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that providing both alias and name raises error.""" + with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'): + await RequestQueue.open( + name='some-name', + alias='some-alias', + storage_client=storage_client, + configuration=configuration, + ) + + +async def test_alias_with_special_characters( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test alias functionality with special characters.""" + special_aliases = [ + 'alias-with-dashes', + 'alias_with_underscores', + 'alias.with.dots', + 'alias123with456numbers', + 'CamelCaseAlias', + ] + + queues = [] + for alias in special_aliases: + rq = await RequestQueue.open( + alias=alias, + storage_client=storage_client, + configuration=configuration, + ) + queues.append(rq) + + # Add request with the alias as identifier in URL + await rq.add_request(f'https://example.com/{alias}') + + # Verify all work correctly + for i, rq in enumerate(queues): + request = await rq.fetch_next_request() + assert request is not None + assert f'/{special_aliases[i]}' in request.url + + # Clean up + for rq in queues: + await rq.drop() + + +async def test_alias_request_operations( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that request operations work correctly with alias queues.""" + rq = await RequestQueue.open( + alias='request_ops_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Test adding multiple requests + urls = [ + 'https://example.com/page1', + 'https://example.com/page2', + 'https://example.com/page3', + ] + + for url in urls: + result = await rq.add_request(url) + assert result.was_already_present is False + + # Test queue metadata + metadata = await rq.get_metadata() + assert metadata.total_request_count == 3 + assert metadata.pending_request_count == 3 + assert metadata.handled_request_count == 0 + + # Test fetching and handling requests + processed_urls = [] + while not await rq.is_empty(): + request = await rq.fetch_next_request() + if request: + processed_urls.append(request.url) + await rq.mark_request_as_handled(request) + + # Verify all requests were processed + assert len(processed_urls) == 3 + assert set(processed_urls) == set(urls) + + # Verify final state + metadata = await rq.get_metadata() + assert metadata.pending_request_count == 0 + assert metadata.handled_request_count == 3 + assert await rq.is_empty() is True + + # Clean up + await rq.drop() + + +async def test_alias_forefront_operations( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test forefront operations work correctly with alias queues.""" + rq = await RequestQueue.open( + alias='forefront_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Add normal requests + await rq.add_request('https://example.com/normal1') + await rq.add_request('https://example.com/normal2') + + # Add priority request to forefront + await rq.add_request('https://example.com/priority', forefront=True) + + # Priority request should come first + priority_request = await rq.fetch_next_request() + assert priority_request is not None + assert priority_request.url == 'https://example.com/priority' + + # Then normal requests + normal_request = await rq.fetch_next_request() + assert normal_request is not None + assert normal_request.url == 'https://example.com/normal1' + + # Clean up + await rq.drop() + + +async def test_alias_batch_operations( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test batch operations work correctly with alias queues.""" + rq = await RequestQueue.open( + alias='batch_test', + storage_client=storage_client, + configuration=configuration, + ) + + # Test batch adding + batch_urls = [ + 'https://example.com/batch1', + 'https://example.com/batch2', + 'https://example.com/batch3', + ] + + await rq.add_requests(batch_urls) + + # Wait for background processing + await asyncio.sleep(0.1) + + # Verify all requests were added + metadata = await rq.get_metadata() + assert metadata.total_request_count == 3 + + # Clean up + await rq.drop() + + +async def test_named_vs_alias_conflict_detection() -> None: + """Test that conflicts between named and alias storages are detected.""" + # Test 1: Create named storage first, then try alias with same name + named_rq = await RequestQueue.open(name='conflict_test') + assert named_rq.name == 'conflict_test' + + # Try to create alias with same name - should raise error + with pytest.raises(ValueError, match=r'Cannot create alias storage "conflict_test".*already exists'): + await RequestQueue.open(alias='conflict_test') + + # Clean up + await named_rq.drop() + + # Test 2: Create alias first, then try named with same name + alias_rq = await RequestQueue.open(alias='conflict_test2') + assert alias_rq.name is None # Alias storages have no name + + # Try to create named with same name - should raise error + with pytest.raises(ValueError, match=r'Cannot create named storage "conflict_test2".*already exists'): + await RequestQueue.open(name='conflict_test2') + + # Clean up + await alias_rq.drop() + + # Test 3: Different names should work fine + named_rq_ok = await RequestQueue.open(name='different_name') + alias_rq_ok = await RequestQueue.open(alias='different_alias') + + assert named_rq_ok.name == 'different_name' + assert alias_rq_ok.name is None + + # Clean up + await named_rq_ok.drop() + await alias_rq_ok.drop() + + +async def test_alias_parameter( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test request queue creation and operations with alias parameter.""" + # Create request queue with alias + alias_rq = await RequestQueue.open( + alias='test_alias', + storage_client=storage_client, + configuration=configuration, + ) + + # Verify alias request queue properties + assert alias_rq.id is not None + assert alias_rq.name is None # Alias storages should be unnamed + + # Test data operations + await alias_rq.add_request('https://example.com/alias') + metadata = await alias_rq.get_metadata() + assert metadata.pending_request_count == 1 + + await alias_rq.drop() + + +async def test_alias_vs_named_isolation( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that alias and named request queues with same identifier are isolated.""" + # Create named request queue + named_rq = await RequestQueue.open( + name='test_identifier', + storage_client=storage_client, + configuration=configuration, + ) + + # Verify named request queue + assert named_rq.name == 'test_identifier' + await named_rq.add_request('https://named.example.com') + + # Clean up named request queue first + await named_rq.drop() + + # Now create alias request queue with same identifier (should work after cleanup) + alias_rq = await RequestQueue.open( + alias='test_identifier', + storage_client=storage_client, + configuration=configuration, + ) + + # Should be different instance + assert alias_rq.name is None + await alias_rq.add_request('https://alias.example.com') + + # Verify alias data + alias_request = await alias_rq.fetch_next_request() + assert alias_request is not None + assert alias_request.url == 'https://alias.example.com' + + await alias_rq.drop() + + +async def test_default_vs_alias_default_equivalence( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that default request queue and alias='default' are equivalent.""" + # Open default request queue + default_rq = await RequestQueue.open( + storage_client=storage_client, + configuration=configuration, + ) + + # Open alias='default' request queue + alias_default_rq = await RequestQueue.open( + alias='default', + storage_client=storage_client, + configuration=configuration, + ) + + # Should be the same + assert default_rq.id == alias_default_rq.id + assert default_rq.name is None + assert alias_default_rq.name is None + + # Data should be shared + await default_rq.add_request('https://default.example.com') + metadata = await alias_default_rq.get_metadata() + assert metadata.pending_request_count == 1 + + await default_rq.drop() + + +async def test_alias_parameter_validation( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test alias parameter validation.""" + # Should not allow both name and alias + with pytest.raises(ValueError, match=r'Only one of'): + await RequestQueue.open( + name='test', + alias='test', + storage_client=storage_client, + configuration=configuration, + ) + + # Valid alias should work + alias_rq = await RequestQueue.open( + alias='valid_alias', + storage_client=storage_client, + configuration=configuration, + ) + assert alias_rq.name is None + await alias_rq.drop() + + +async def test_multiple_alias_isolation( + storage_client: StorageClient, + configuration: Configuration, +) -> None: + """Test that different aliases create separate request queues.""" + request_queues = [] + + for i in range(3): + rq = await RequestQueue.open( + alias=f'alias_{i}', + storage_client=storage_client, + configuration=configuration, + ) + await rq.add_request(f'https://example.com/alias_{i}') + request_queues.append(rq) + + # All should be different + for i in range(3): + for j in range(i + 1, 3): + assert request_queues[i].id != request_queues[j].id + + # Verify data isolation + for i, rq in enumerate(request_queues): + request = await rq.fetch_next_request() + assert request is not None + assert request.url == f'https://example.com/alias_{i}' + await rq.drop() + + +async def test_purge_on_start_enabled(storage_client: StorageClient) -> None: + """Test purge behavior when purge_on_start=True: named storages retain data, unnamed storages are purged.""" + + # Skip this test for memory storage since it doesn't persist data between client instances. + if storage_client.__class__.__name__ == 'MemoryStorageClient': + pytest.skip('Memory storage does not persist data between client instances.') + + configuration = Configuration(purge_on_start=True) + + # First, create all storage types with purge enabled and add data. + default_rq = await RequestQueue.open( + storage_client=storage_client, + configuration=configuration, + ) + + alias_rq = await RequestQueue.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + + named_rq = await RequestQueue.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + await default_rq.add_requests( + [ + 'https://default.example.com/1', + 'https://default.example.com/2', + 'https://default.example.com/3', + ] + ) + await alias_rq.add_requests( + [ + 'https://alias.example.com/1', + 'https://alias.example.com/2', + 'https://alias.example.com/3', + ] + ) + await named_rq.add_requests( + [ + 'https://named.example.com/1', + 'https://named.example.com/2', + 'https://named.example.com/3', + ] + ) + + default_request = await default_rq.fetch_next_request() + alias_request = await alias_rq.fetch_next_request() + named_request = await named_rq.fetch_next_request() + + assert default_request is not None + assert alias_request is not None + assert named_request is not None + + await default_rq.mark_request_as_handled(default_request) + await alias_rq.mark_request_as_handled(alias_request) + await named_rq.mark_request_as_handled(named_request) + + # Verify data was added + default_metadata = await default_rq.get_metadata() + alias_metadata = await alias_rq.get_metadata() + named_metadata = await named_rq.get_metadata() + + assert default_metadata.pending_request_count == 2 + assert alias_metadata.pending_request_count == 2 + assert named_metadata.pending_request_count == 2 + + assert default_metadata.handled_request_count == 1 + assert alias_metadata.handled_request_count == 1 + assert named_metadata.handled_request_count == 1 + + assert default_metadata.total_request_count == 3 + assert alias_metadata.total_request_count == 3 + assert named_metadata.total_request_count == 3 + + # Verify that default and alias storages are unnamed + assert default_metadata.name is None + assert alias_metadata.name is None + assert named_metadata.name == 'purge_test_named' + + # Clear storage cache to simulate "reopening" storages + service_locator.storage_instance_manager.clear_cache() + + # Now "reopen" all storages + default_rq_2 = await RequestQueue.open( + storage_client=storage_client, + configuration=configuration, + ) + alias_rq_2 = await RequestQueue.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + named_rq_2 = await RequestQueue.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + # Check the data after purge + default_metadata_after = await default_rq_2.get_metadata() + alias_metadata_after = await alias_rq_2.get_metadata() + named_metadata_after = await named_rq_2.get_metadata() + + # Unnamed storages (alias and default) should be purged (data removed) + assert default_metadata_after.pending_request_count == 0 + assert alias_metadata_after.pending_request_count == 0 + assert named_metadata_after.pending_request_count == 2 + + assert default_metadata_after.handled_request_count == 1 + assert alias_metadata_after.handled_request_count == 1 + assert named_metadata_after.handled_request_count == 1 + + assert default_metadata_after.total_request_count == 3 + assert alias_metadata_after.total_request_count == 3 + assert named_metadata_after.total_request_count == 3 + + # Clean up + await named_rq_2.drop() + await alias_rq_2.drop() + await default_rq_2.drop() + + +async def test_purge_on_start_disabled(storage_client: StorageClient) -> None: + """Test purge behavior when purge_on_start=False: all storages retain data regardless of type.""" + + # Skip this test for memory storage since it doesn't persist data between client instances. + if storage_client.__class__.__name__ == 'MemoryStorageClient': + pytest.skip('Memory storage does not persist data between client instances.') + + configuration = Configuration(purge_on_start=False) + + # First, create all storage types with purge disabled and add data. + default_rq = await RequestQueue.open( + storage_client=storage_client, + configuration=configuration, + ) + + alias_rq = await RequestQueue.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + + named_rq = await RequestQueue.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + await default_rq.add_requests( + [ + 'https://default.example.com/1', + 'https://default.example.com/2', + 'https://default.example.com/3', + ] + ) + await alias_rq.add_requests( + [ + 'https://alias.example.com/1', + 'https://alias.example.com/2', + 'https://alias.example.com/3', + ] + ) + await named_rq.add_requests( + [ + 'https://named.example.com/1', + 'https://named.example.com/2', + 'https://named.example.com/3', + ] + ) + + default_request = await default_rq.fetch_next_request() + alias_request = await alias_rq.fetch_next_request() + named_request = await named_rq.fetch_next_request() + + assert default_request is not None + assert alias_request is not None + assert named_request is not None + + await default_rq.mark_request_as_handled(default_request) + await alias_rq.mark_request_as_handled(alias_request) + await named_rq.mark_request_as_handled(named_request) + + # Verify data was added + default_metadata = await default_rq.get_metadata() + alias_metadata = await alias_rq.get_metadata() + named_metadata = await named_rq.get_metadata() + + assert default_metadata.pending_request_count == 2 + assert alias_metadata.pending_request_count == 2 + assert named_metadata.pending_request_count == 2 + + assert default_metadata.handled_request_count == 1 + assert alias_metadata.handled_request_count == 1 + assert named_metadata.handled_request_count == 1 + + assert default_metadata.total_request_count == 3 + assert alias_metadata.total_request_count == 3 + assert named_metadata.total_request_count == 3 + + # Verify that default and alias storages are unnamed + assert default_metadata.name is None + assert alias_metadata.name is None + assert named_metadata.name == 'purge_test_named' + + # Clear storage cache to simulate "reopening" storages + service_locator.storage_instance_manager.clear_cache() + + # Now "reopen" all storages + default_rq_2 = await RequestQueue.open( + storage_client=storage_client, + configuration=configuration, + ) + alias_rq_2 = await RequestQueue.open( + alias='purge_test_alias', + storage_client=storage_client, + configuration=configuration, + ) + named_rq_2 = await RequestQueue.open( + name='purge_test_named', + storage_client=storage_client, + configuration=configuration, + ) + + # Check the data after purge + default_metadata_after = await default_rq_2.get_metadata() + alias_metadata_after = await alias_rq_2.get_metadata() + named_metadata_after = await named_rq_2.get_metadata() + + # Unnamed storages (alias and default) should be purged (data removed) + assert default_metadata_after.pending_request_count == 2 + assert alias_metadata_after.pending_request_count == 2 + assert named_metadata_after.pending_request_count == 2 + + assert default_metadata_after.handled_request_count == 1 + assert alias_metadata_after.handled_request_count == 1 + assert named_metadata_after.handled_request_count == 1 + + assert default_metadata_after.total_request_count == 3 + assert alias_metadata_after.total_request_count == 3 + assert named_metadata_after.total_request_count == 3 + + # Clean up + await named_rq_2.drop() + await alias_rq_2.drop() + await default_rq_2.drop() From eab68bc93b57eded5b525cbf3554c6623776f64c Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 10 Sep 2025 09:32:01 +0200 Subject: [PATCH 2/8] Improve FS clients creation --- .../_file_system/_dataset_client.py | 82 ++++++------------- .../_file_system/_key_value_store_client.py | 75 +++++------------ .../_file_system/_request_queue_client.py | 72 +++++----------- 3 files changed, 66 insertions(+), 163 deletions(-) diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py index 152bb8a234..5222be4eb5 100644 --- a/src/crawlee/storage_clients/_file_system/_dataset_client.py +++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py @@ -56,9 +56,8 @@ def __init__( self, *, metadata: DatasetMetadata, - storage_dir: Path, + path_to_dataset: Path, lock: asyncio.Lock, - directory_name: str | None = None, ) -> None: """Initialize a new instance. @@ -66,11 +65,8 @@ def __init__( """ self._metadata = metadata - self._storage_dir = storage_dir - """The base directory where the storage data are being persisted.""" - - self._directory_name = directory_name - """The directory name to use for this dataset. If None, uses metadata.name or default.""" + self._path_to_dataset = path_to_dataset + """The full path to the dataset directory.""" self._lock = lock """A lock to ensure that only one operation is performed at a time.""" @@ -82,14 +78,7 @@ async def get_metadata(self) -> DatasetMetadata: @property def path_to_dataset(self) -> Path: """The full path to the dataset directory.""" - # Use the explicit directory name if provided, otherwise fall back to metadata.name or default - if self._directory_name is not None: - return self._storage_dir / self._STORAGE_SUBDIR / self._directory_name - - if self._metadata.name is None: - return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT - - return self._storage_dir / self._STORAGE_SUBDIR / self._metadata.name + return self._path_to_dataset @property def path_to_metadata(self) -> Path: @@ -124,12 +113,12 @@ async def open( ValueError: If a dataset with the specified ID is not found, if metadata is invalid, or if both name and alias are provided. """ - # Validate parameters - exactly one of name or alias should be provided (or neither for default) - if name is not None and alias is not None: - raise ValueError('Cannot specify both name and alias parameters') + # Validate parameters + specified_params = sum(1 for param in [id, name, alias] if param is not None) + if specified_params > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - storage_dir = Path(configuration.storage_dir) - dataset_base_path = storage_dir / cls._STORAGE_SUBDIR + dataset_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR if not dataset_base_path.exists(): await asyncio.to_thread(dataset_base_path.mkdir, parents=True, exist_ok=True) @@ -141,21 +130,20 @@ async def open( if not dataset_dir.is_dir(): continue - metadata_path = dataset_dir / METADATA_FILENAME - if not metadata_path.exists(): + path_to_metadata = dataset_dir / METADATA_FILENAME + if not path_to_metadata.exists(): continue try: - file = await asyncio.to_thread(metadata_path.open) + file = await asyncio.to_thread(path_to_metadata.open) try: file_content = json.load(file) metadata = DatasetMetadata(**file_content) if metadata.id == id: client = cls( metadata=metadata, - storage_dir=storage_dir, + path_to_dataset=dataset_base_path / dataset_dir, lock=asyncio.Lock(), - directory_name=dataset_dir.name, # Use the actual directory name ) await client._update_metadata(update_accessed_at=True) found = True @@ -170,48 +158,29 @@ async def open( # Get a new instance by name or alias. else: - # Determine the directory name and metadata name based on whether this is a named or alias storage - if alias is not None: - # For alias storages, use the alias as directory name and set metadata.name to None - # Special case: alias='default' should use the same directory as default storage - directory_name = None if alias == 'default' else alias - actual_name = None - elif name is not None: - # For named storages, use the name as both directory name and metadata.name - directory_name = name - actual_name = name - else: - # For default storage (no name or alias), use None for both - same as alias='default' - directory_name = None - actual_name = None - - dataset_path = ( - dataset_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT - if directory_name is None - else dataset_base_path / directory_name - ) - metadata_path = dataset_path / METADATA_FILENAME + dataset_dir = Path(name) if name else Path(alias) if alias else Path('default') + path_to_dataset = dataset_base_path / dataset_dir + path_to_metadata = path_to_dataset / METADATA_FILENAME # If the dataset directory exists, reconstruct the client from the metadata file. - if dataset_path.exists() and metadata_path.exists(): - file = await asyncio.to_thread(open, metadata_path) + if path_to_dataset.exists() and path_to_metadata.exists(): + file = await asyncio.to_thread(open, path_to_metadata) try: file_content = json.load(file) finally: await asyncio.to_thread(file.close) try: metadata = DatasetMetadata(**file_content) - # For aliases, ensure the metadata.name is None - if alias is not None: - metadata = metadata.model_copy(update={'name': None}) except ValidationError as exc: - raise ValueError(f'Invalid metadata file for dataset "{name}"') from exc + raise ValueError(f'Invalid metadata file for dataset "{name or alias}"') from exc + + # Update metadata name to match the resolution. + metadata.name = name client = cls( metadata=metadata, - storage_dir=storage_dir, + path_to_dataset=path_to_dataset, lock=asyncio.Lock(), - directory_name=directory_name, ) await client._update_metadata(update_accessed_at=True) @@ -221,7 +190,7 @@ async def open( now = datetime.now(timezone.utc) metadata = DatasetMetadata( id=crypto_random_object_id(), - name=actual_name, # Use actual_name which will be None for aliases + name=name, created_at=now, accessed_at=now, modified_at=now, @@ -229,9 +198,8 @@ async def open( ) client = cls( metadata=metadata, - storage_dir=storage_dir, + path_to_dataset=path_to_dataset, lock=asyncio.Lock(), - directory_name=directory_name, ) await client._update_metadata() diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index d8fc21d17b..e87fc8ec32 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -55,9 +55,8 @@ def __init__( self, *, metadata: KeyValueStoreMetadata, - storage_dir: Path, + path_to_kvs: Path, lock: asyncio.Lock, - directory_name: str | None = None, ) -> None: """Initialize a new instance. @@ -65,11 +64,8 @@ def __init__( """ self._metadata = metadata - self._storage_dir = storage_dir - """The base directory where the storage data are being persisted.""" - - self._directory_name = directory_name - """The directory name to use for this key-value store. If None, uses metadata.name or default.""" + self._path_to_kvs = path_to_kvs + """The full path to the key-value store directory.""" self._lock = lock """A lock to ensure that only one operation is performed at a time.""" @@ -81,14 +77,7 @@ async def get_metadata(self) -> KeyValueStoreMetadata: @property def path_to_kvs(self) -> Path: """The full path to the key-value store directory.""" - # Use the explicit directory name if provided, otherwise fall back to metadata.name or default - if self._directory_name is not None: - return self._storage_dir / self._STORAGE_SUBDIR / self._directory_name - - if self._metadata.name is None: - return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT - - return self._storage_dir / self._STORAGE_SUBDIR / self._metadata.name + return self._path_to_kvs @property def path_to_metadata(self) -> Path: @@ -127,8 +116,7 @@ async def open( if name is not None and alias is not None: raise ValueError('Cannot specify both name and alias parameters') - storage_dir = Path(configuration.storage_dir) - kvs_base_path = storage_dir / cls._STORAGE_SUBDIR + kvs_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR if not kvs_base_path.exists(): await asyncio.to_thread(kvs_base_path.mkdir, parents=True, exist_ok=True) @@ -140,21 +128,20 @@ async def open( if not kvs_dir.is_dir(): continue - metadata_path = kvs_dir / METADATA_FILENAME - if not metadata_path.exists(): + path_to_metadata = kvs_dir / METADATA_FILENAME + if not path_to_metadata.exists(): continue try: - file = await asyncio.to_thread(metadata_path.open) + file = await asyncio.to_thread(path_to_metadata.open) try: file_content = json.load(file) metadata = KeyValueStoreMetadata(**file_content) if metadata.id == id: client = cls( metadata=metadata, - storage_dir=storage_dir, + path_to_kvs=kvs_base_path / kvs_dir, lock=asyncio.Lock(), - directory_name=kvs_dir.name, # Use the actual directory name ) await client._update_metadata(update_accessed_at=True) found = True @@ -169,48 +156,29 @@ async def open( # Get a new instance by name or alias. else: - # Determine the directory name and metadata name based on whether this is a named or alias storage - if alias is not None: - # For alias storages, use the alias as directory name and set metadata.name to None - # Special case: alias='default' should use the same directory as default storage - directory_name = None if alias == 'default' else alias - actual_name = None - elif name is not None: - # For named storages, use the name as both directory name and metadata.name - directory_name = name - actual_name = name - else: - # For default storage (no name or alias), use None for both - same as alias='default' - directory_name = None - actual_name = None - - kvs_path = ( - kvs_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT - if directory_name is None - else kvs_base_path / directory_name - ) - metadata_path = kvs_path / METADATA_FILENAME + kvs_dir = Path(name) if name else Path(alias) if alias else Path('default') + path_to_kvs = kvs_base_path / kvs_dir + path_to_metadata = path_to_kvs / METADATA_FILENAME # If the key-value store directory exists, reconstruct the client from the metadata file. - if kvs_path.exists() and metadata_path.exists(): - file = await asyncio.to_thread(open, metadata_path) + if path_to_kvs.exists() and path_to_metadata.exists(): + file = await asyncio.to_thread(open, path_to_metadata) try: file_content = json.load(file) finally: await asyncio.to_thread(file.close) try: metadata = KeyValueStoreMetadata(**file_content) - # For aliases, ensure the metadata.name is None - if alias is not None: - metadata = metadata.model_copy(update={'name': None}) except ValidationError as exc: - raise ValueError(f'Invalid metadata file for key-value store "{name}"') from exc + raise ValueError(f'Invalid metadata file for key-value store "{name or alias}"') from exc + + # Update metadata name to match the resolution. + metadata.name = name client = cls( metadata=metadata, - storage_dir=storage_dir, + path_to_kvs=path_to_kvs, lock=asyncio.Lock(), - directory_name=directory_name, ) await client._update_metadata(update_accessed_at=True) @@ -220,16 +188,15 @@ async def open( now = datetime.now(timezone.utc) metadata = KeyValueStoreMetadata( id=crypto_random_object_id(), - name=actual_name, # Use actual_name which will be None for aliases + name=name, created_at=now, accessed_at=now, modified_at=now, ) client = cls( metadata=metadata, - storage_dir=storage_dir, + path_to_kvs=path_to_kvs, lock=asyncio.Lock(), - directory_name=directory_name, ) await client._update_metadata() diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 73b2656d07..ae0621d6e8 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -89,9 +89,8 @@ def __init__( self, *, metadata: RequestQueueMetadata, - storage_dir: Path, + path_to_rq: Path, lock: asyncio.Lock, - directory_name: str | None = None, ) -> None: """Initialize a new instance. @@ -99,11 +98,8 @@ def __init__( """ self._metadata = metadata - self._storage_dir = storage_dir - """The base directory where the storage data are being persisted.""" - - self._directory_name = directory_name - """The directory name to use for this request queue. If None, uses metadata.name or default.""" + self._path_to_rq = path_to_rq + """The full path to the request queue directory.""" self._lock = lock """A lock to ensure that only one operation is performed at a time.""" @@ -133,14 +129,7 @@ async def get_metadata(self) -> RequestQueueMetadata: @property def path_to_rq(self) -> Path: """The full path to the request queue directory.""" - # Use the explicit directory name if provided, otherwise fall back to metadata.name or default - if self._directory_name is not None: - return self._storage_dir / self._STORAGE_SUBDIR / self._directory_name - - if self._metadata.name is None: - return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT - - return self._storage_dir / self._STORAGE_SUBDIR / self._metadata.name + return self._path_to_rq @property def path_to_metadata(self) -> Path: @@ -179,8 +168,7 @@ async def open( if name is not None and alias is not None: raise ValueError('Cannot specify both name and alias parameters') - storage_dir = Path(configuration.storage_dir) - rq_base_path = storage_dir / cls._STORAGE_SUBDIR + rq_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR if not rq_base_path.exists(): await asyncio.to_thread(rq_base_path.mkdir, parents=True, exist_ok=True) @@ -192,12 +180,12 @@ async def open( if not rq_dir.is_dir(): continue - metadata_path = rq_dir / METADATA_FILENAME - if not metadata_path.exists(): + path_to_metadata = rq_dir / METADATA_FILENAME + if not path_to_metadata.exists(): continue try: - file = await asyncio.to_thread(metadata_path.open) + file = await asyncio.to_thread(path_to_metadata.open) try: file_content = json.load(file) metadata = RequestQueueMetadata(**file_content) @@ -205,9 +193,8 @@ async def open( if metadata.id == id: client = cls( metadata=metadata, - storage_dir=storage_dir, + path_to_rq=rq_base_path / rq_dir, lock=asyncio.Lock(), - directory_name=rq_dir.name, # Use the actual directory name ) await client._state.initialize() await client._discover_existing_requests() @@ -224,31 +211,13 @@ async def open( # Open an existing RQ by its name or alias, or create a new one if not found. else: - # Determine the directory name and metadata name based on whether this is a named or alias storage - if alias is not None: - # For alias storages, use the alias as directory name and set metadata.name to None - # Special case: alias='default' should use the same directory as default storage - directory_name = None if alias == 'default' else alias - actual_name = None - elif name is not None: - # For named storages, use the name as both directory name and metadata.name - directory_name = name - actual_name = name - else: - # For default storage (no name or alias), use None for both - same as alias='default' - directory_name = None - actual_name = None - - rq_path = ( - rq_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT - if directory_name is None - else rq_base_path / directory_name - ) - metadata_path = rq_path / METADATA_FILENAME + rq_dir = Path(name) if name else Path(alias) if alias else Path('default') + path_to_rq = rq_base_path / rq_dir + path_to_metadata = path_to_rq / METADATA_FILENAME # If the RQ directory exists, reconstruct the client from the metadata file. - if rq_path.exists() and metadata_path.exists(): - file = await asyncio.to_thread(open, metadata_path) + if path_to_rq.exists() and path_to_metadata.exists(): + file = await asyncio.to_thread(open, path_to_metadata) try: file_content = json.load(file) finally: @@ -256,15 +225,15 @@ async def open( try: metadata = RequestQueueMetadata(**file_content) except ValidationError as exc: - raise ValueError(f'Invalid metadata file for request queue "{name}"') from exc + raise ValueError(f'Invalid metadata file for request queue "{name or alias}"') from exc - metadata.name = actual_name # Use actual_name which will be None for aliases + # Update metadata name to match the resolution. + metadata.name = name client = cls( metadata=metadata, - storage_dir=storage_dir, + path_to_rq=path_to_rq, lock=asyncio.Lock(), - directory_name=directory_name, ) await client._state.initialize() @@ -276,7 +245,7 @@ async def open( now = datetime.now(timezone.utc) metadata = RequestQueueMetadata( id=crypto_random_object_id(), - name=actual_name, # Use actual_name which will be None for aliases + name=name, created_at=now, accessed_at=now, modified_at=now, @@ -287,9 +256,8 @@ async def open( ) client = cls( metadata=metadata, - storage_dir=storage_dir, + path_to_rq=path_to_rq, lock=asyncio.Lock(), - directory_name=directory_name, ) await client._state.initialize() await client._update_metadata() From 19b7ea3be6610326231e98c2b3d5e17588417fea Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 10 Sep 2025 09:40:47 +0200 Subject: [PATCH 3/8] Improve memory clients creation --- .../_file_system/_dataset_client.py | 5 +---- .../_file_system/_key_value_store_client.py | 10 ++++------ .../_file_system/_request_queue_client.py | 10 ++++------ .../_memory/_dataset_client.py | 20 +++++-------------- .../_memory/_key_value_store_client.py | 20 +++++-------------- .../_memory/_request_queue_client.py | 20 +++++-------------- 6 files changed, 24 insertions(+), 61 deletions(-) diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py index 5222be4eb5..5c7b281101 100644 --- a/src/crawlee/storage_clients/_file_system/_dataset_client.py +++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py @@ -113,7 +113,7 @@ async def open( ValueError: If a dataset with the specified ID is not found, if metadata is invalid, or if both name and alias are provided. """ - # Validate parameters + # Validate input parameters. specified_params = sum(1 for param in [id, name, alias] if param is not None) if specified_params > 1: raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') @@ -174,9 +174,6 @@ async def open( except ValidationError as exc: raise ValueError(f'Invalid metadata file for dataset "{name or alias}"') from exc - # Update metadata name to match the resolution. - metadata.name = name - client = cls( metadata=metadata, path_to_dataset=path_to_dataset, diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index e87fc8ec32..7975cd9262 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -112,9 +112,10 @@ async def open( ValueError: If a store with the specified ID is not found, if metadata is invalid, or if both name and alias are provided. """ - # Validate parameters - exactly one of name or alias should be provided (or neither for default) - if name is not None and alias is not None: - raise ValueError('Cannot specify both name and alias parameters') + # Validate input parameters. + specified_params = sum(1 for param in [id, name, alias] if param is not None) + if specified_params > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') kvs_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR @@ -172,9 +173,6 @@ async def open( except ValidationError as exc: raise ValueError(f'Invalid metadata file for key-value store "{name or alias}"') from exc - # Update metadata name to match the resolution. - metadata.name = name - client = cls( metadata=metadata, path_to_kvs=path_to_kvs, diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index ae0621d6e8..8485c47c5b 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -164,9 +164,10 @@ async def open( ValueError: If a queue with the specified ID is not found, if metadata is invalid, or if both name and alias are provided. """ - # Validate parameters - exactly one of name or alias should be provided (or neither for default) - if name is not None and alias is not None: - raise ValueError('Cannot specify both name and alias parameters') + # Validate input parameters. + specified_params = sum(1 for param in [id, name, alias] if param is not None) + if specified_params > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') rq_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR @@ -227,9 +228,6 @@ async def open( except ValidationError as exc: raise ValueError(f'Invalid metadata file for request queue "{name or alias}"') from exc - # Update metadata name to match the resolution. - metadata.name = name - client = cls( metadata=metadata, path_to_rq=path_to_rq, diff --git a/src/crawlee/storage_clients/_memory/_dataset_client.py b/src/crawlee/storage_clients/_memory/_dataset_client.py index a28732b884..970678b6d1 100644 --- a/src/crawlee/storage_clients/_memory/_dataset_client.py +++ b/src/crawlee/storage_clients/_memory/_dataset_client.py @@ -72,20 +72,10 @@ async def open( Raises: ValueError: If both name and alias are provided, or if neither id, name, nor alias is provided. """ - # Validate parameters - exactly one of name or alias should be provided (or neither for default) - if name is not None and alias is not None: - raise ValueError('Cannot specify both name and alias parameters') - - # Determine the actual name to use in metadata - if alias is not None: - # For alias storages, metadata.name should be None (unnamed storage) - actual_name = None - elif name is not None: - # For named storages, use the provided name - actual_name = name - else: - # For default storage (no name or alias), use None - actual_name = None + # Validate input parameters. + specified_params = sum(1 for param in [id, name, alias] if param is not None) + if specified_params > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') # Create a new dataset dataset_id = id or crypto_random_object_id() @@ -93,7 +83,7 @@ async def open( metadata = DatasetMetadata( id=dataset_id, - name=actual_name, + name=name, created_at=now, accessed_at=now, modified_at=now, diff --git a/src/crawlee/storage_clients/_memory/_key_value_store_client.py b/src/crawlee/storage_clients/_memory/_key_value_store_client.py index b936e864a7..d4d05c9a60 100644 --- a/src/crawlee/storage_clients/_memory/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_memory/_key_value_store_client.py @@ -70,20 +70,10 @@ async def open( Raises: ValueError: If both name and alias are provided. """ - # Validate parameters - exactly one of name or alias should be provided (or neither for default) - if name is not None and alias is not None: - raise ValueError('Cannot specify both name and alias parameters') - - # Determine the actual name to use in metadata - if alias is not None: - # For alias storages, metadata.name should be None (unnamed storage) - actual_name = None - elif name is not None: - # For named storages, use the provided name - actual_name = name - else: - # For default storage (no name or alias), use None - actual_name = None + # Validate input parameters. + specified_params = sum(1 for param in [id, name, alias] if param is not None) + if specified_params > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') # Create a new key-value store store_id = id or crypto_random_object_id() @@ -91,7 +81,7 @@ async def open( metadata = KeyValueStoreMetadata( id=store_id, - name=actual_name, + name=name, created_at=now, accessed_at=now, modified_at=now, diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py index ba005029c2..cc6cb299d4 100644 --- a/src/crawlee/storage_clients/_memory/_request_queue_client.py +++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py @@ -82,20 +82,10 @@ async def open( Raises: ValueError: If both name and alias are provided. """ - # Validate parameters - exactly one of name or alias should be provided (or neither for default) - if name is not None and alias is not None: - raise ValueError('Cannot specify both name and alias parameters') - - # Determine the actual name to use in metadata - if alias is not None: - # For alias storages, metadata.name should be None (unnamed storage) - actual_name = None - elif name is not None: - # For named storages, use the provided name - actual_name = name - else: - # For default storage (no name or alias), use None - actual_name = None + # Validate input parameters. + specified_params = sum(1 for param in [id, name, alias] if param is not None) + if specified_params > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') # Create a new queue queue_id = id or crypto_random_object_id() @@ -103,7 +93,7 @@ async def open( metadata = RequestQueueMetadata( id=queue_id, - name=actual_name, + name=name, created_at=now, accessed_at=now, modified_at=now, From 5d3992320cf88d176e7db4e5f7b92d4a7a9485f9 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 10 Sep 2025 10:55:34 +0200 Subject: [PATCH 4/8] Improve tests --- tests/unit/storages/test_dataset.py | 24 ----------------------- tests/unit/storages/test_request_queue.py | 9 +++++---- 2 files changed, 5 insertions(+), 28 deletions(-) diff --git a/tests/unit/storages/test_dataset.py b/tests/unit/storages/test_dataset.py index 3858e796ff..c8ce6daf01 100644 --- a/tests/unit/storages/test_dataset.py +++ b/tests/unit/storages/test_dataset.py @@ -849,30 +849,6 @@ async def test_default_vs_alias_default_equivalence( await default_dataset.drop() -async def test_alias_parameter_validation( - storage_client: StorageClient, - configuration: Configuration, -) -> None: - """Test alias parameter validation.""" - # Should not allow both name and alias - with pytest.raises(ValueError, match=r'Only one of'): - await Dataset.open( - name='test', - alias='test', - storage_client=storage_client, - configuration=configuration, - ) - - # Valid alias should work - alias_dataset = await Dataset.open( - alias='valid_alias', - storage_client=storage_client, - configuration=configuration, - ) - assert alias_dataset.name is None - await alias_dataset.drop() - - async def test_multiple_alias_isolation( storage_client: StorageClient, configuration: Configuration, diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index ed733a866c..9594195f31 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -667,8 +667,9 @@ async def test_open_with_alias( assert rq_2.name is None # Add different requests to each - await rq_1.add_request('https://example1.com') - await rq_2.add_request('https://example2.com') + await rq_1.add_request('https://example.com/1') + await rq_1.add_request('https://example.com/2') + await rq_2.add_request('https://example.com/3') # Verify data isolation request_1 = await rq_1.fetch_next_request() @@ -676,8 +677,8 @@ async def test_open_with_alias( assert request_1 is not None assert request_2 is not None - assert request_1.url == 'https://example1.com' - assert request_2.url == 'https://example2.com' + assert request_1.url == 'https://example.com/1' + assert request_2.url == 'https://example.com/3' # Clean up await rq_1.drop() From f8aba0c645e13355ba76e7813b51be3d8b90476d Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 10 Sep 2025 14:14:41 +0200 Subject: [PATCH 5/8] Add docs --- docs/guides/code_examples/storages/opening.py | 19 ++++++++++++ docs/guides/storages.mdx | 31 +++++++++++++------ 2 files changed, 41 insertions(+), 9 deletions(-) create mode 100644 docs/guides/code_examples/storages/opening.py diff --git a/docs/guides/code_examples/storages/opening.py b/docs/guides/code_examples/storages/opening.py new file mode 100644 index 0000000000..0e72d574a2 --- /dev/null +++ b/docs/guides/code_examples/storages/opening.py @@ -0,0 +1,19 @@ +import asyncio + +from crawlee.storages import Dataset + + +async def main() -> None: + # Named storage (persists across runs) + dataset_named = await Dataset.open(name='my-persistent-dataset') + + # Unnamed storage with alias (purged on start) + dataset_unnamed = await Dataset.open(alias='temporary-results') + + # Default unnamed storage (both are equivalent and purged on start) + dataset_default = await Dataset.open() + dataset_default = await Dataset.open(alias='default') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/docs/guides/storages.mdx b/docs/guides/storages.mdx index 227b08af14..076b54647b 100644 --- a/docs/guides/storages.mdx +++ b/docs/guides/storages.mdx @@ -9,6 +9,8 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import RunnableCodeBlock from '@site/src/components/RunnableCodeBlock'; +import OpeningExample from '!!raw-loader!roa-loader!./code_examples/storages/opening.py'; + import RqBasicExample from '!!raw-loader!roa-loader!./code_examples/storages/rq_basic_example.py'; import RqWithCrawlerExample from '!!raw-loader!roa-loader!./code_examples/storages/rq_with_crawler_example.py'; import RqWithCrawlerExplicitExample from '!!raw-loader!roa-loader!./code_examples/storages/rq_with_crawler_explicit_example.py'; @@ -26,7 +28,9 @@ import KvsWithCrawlerExplicitExample from '!!raw-loader!roa-loader!./code_exampl import CleaningDoNotPurgeExample from '!!raw-loader!roa-loader!./code_examples/storages/cleaning_do_not_purge_example.py'; import CleaningPurgeExplicitlyExample from '!!raw-loader!roa-loader!./code_examples/storages/cleaning_purge_explicitly_example.py'; -Crawlee offers several storage types for managing and persisting your crawling data. Request-oriented storages, such as the `RequestQueue`, help you store and deduplicate URLs, while result-oriented storages, like `Dataset` and `KeyValueStore`, focus on storing and retrieving scraping results. This guide helps you choose the storage type that suits your needs. +Crawlee offers several storage types for managing and persisting your crawling data. Request-oriented storages, such as the `RequestQueue`, help you store and deduplicate URLs, while result-oriented storages, like `Dataset` and `KeyValueStore`, focus on storing and retrieving scraping results. This guide explains when to use each type, how to interact with them, and how to control their lifecycle. + +## Overview Crawlee's storage system consists of two main layers: - **Storages** (`Dataset`, `KeyValueStore`, `RequestQueue`): High-level interfaces for interacting with different storage types. @@ -70,6 +74,21 @@ Storage --|> KeyValueStore Storage --|> RequestQueue ``` +### Named and unnamed storages + +Crawlee supports two types of storages: + +- **Named storages**: Persistent storages with a specific name that persist across runs. These are useful when you want to share data between different crawler runs or access the same storage from multiple places. +- **Unnamed storages**: Temporary storages identified by an alias that are scoped to a single run. These are automatically purged at the start of each run (when `purge_on_start` is enabled, which is the default). + +### Default storage + +Each storage type (`Dataset`, `KeyValueStore`, `RequestQueue`) has a default instance that can be accessed without specifying `id`, `name` or `alias`. Default unnamed storage is accessed by calling storage's `open` method without parameters. This is the most common way to use storages in simple crawlers. The special alias `"default"` is equivalent to calling `open` without parameters + + + {OpeningExample} + + ## Request queue The `RequestQueue` is the primary storage for URLs in Crawlee, especially useful for deep crawling. It supports dynamic addition of URLs, making it ideal for recursive tasks where URLs are discovered and added during the crawling process (e.g., following links across multiple pages). Each Crawlee project has a **default request queue**, which can be used to store URLs during a specific run. @@ -186,13 +205,7 @@ Crawlee provides the following helper function to simplify interactions with the ## Cleaning up the storages -By default, Crawlee automatically cleans up **default storages** before each crawler run to ensure a clean state. This behavior is controlled by the `Configuration.purge_on_start` setting (default: `True`). - -### What gets purged - -- **Default storages** are completely removed and recreated at the start of each run, ensuring that you start with a clean slate. -- **Named storages** are never automatically purged and persist across runs. -- The behavior depends on the storage client implementation. +By default, Crawlee cleans up all unnamed storages (including the default one) at the start of each run, so every crawl begins with a clean state. This behavior is controlled by `Configuration.purge_on_start` (default: True). In contrast, named storages are never purged automatically and persist across runs. The exact behavior may vary depending on the storage client implementation. ### When purging happens @@ -221,6 +234,6 @@ Note that purging behavior may vary between storage client implementations. For ## Conclusion -This guide introduced you to the different storage types available in Crawlee and how to interact with them. You learned how to manage requests using the `RequestQueue` and store and retrieve scraping results using the `Dataset` and `KeyValueStore`. You also discovered how to use helper functions to simplify interactions with these storages. Finally, you learned how to clean up storages before starting a crawler run. +This guide introduced you to the different storage types available in Crawlee and how to interact with them. You learned about the distinction between named storages (persistent across runs) and unnamed storages with aliases (temporary and purged on start). You discovered how to manage requests using the `RequestQueue` and store and retrieve scraping results using the `Dataset` and `KeyValueStore`. You also learned how to use helper functions to simplify interactions with these storages and how to control storage cleanup behavior. If you have questions or need assistance, feel free to reach out on our [GitHub](https://github.com/apify/crawlee-python) or join our [Discord community](https://discord.com/invite/jyEM2PRvMU). Happy scraping! From 50dde4253ba61a5d1a2e17df29f5e80c30a13147 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Thu, 11 Sep 2025 09:03:43 +0200 Subject: [PATCH 6/8] address feedback --- src/crawlee/_types.py | 12 +++++----- src/crawlee/crawlers/_basic/_basic_crawler.py | 14 +++++------ .../_file_system/_dataset_client.py | 4 ++-- .../_file_system/_key_value_store_client.py | 4 ++-- .../_file_system/_request_queue_client.py | 4 ++-- .../_memory/_dataset_client.py | 4 ++-- .../_memory/_key_value_store_client.py | 4 ++-- .../_memory/_request_queue_client.py | 4 ++-- tests/unit/storages/test_key_value_store.py | 24 ------------------- tests/unit/storages/test_request_queue.py | 24 ------------------- 10 files changed, 25 insertions(+), 73 deletions(-) diff --git a/src/crawlee/_types.py b/src/crawlee/_types.py index b5ce90491e..51f9d357e7 100644 --- a/src/crawlee/_types.py +++ b/src/crawlee/_types.py @@ -434,8 +434,8 @@ def __call__( Args: id: The ID of the `KeyValueStore` to get. - name: The name of the `KeyValueStore` to get (global scope). - alias: The alias of the `KeyValueStore` to get (run scope, unnamed). + name: The name of the `KeyValueStore` to get (global scope, named storage). + alias: The alias of the `KeyValueStore` to get (run scope, unnamed storage). """ @@ -456,8 +456,8 @@ def __call__( Args: id: The ID of the `KeyValueStore` to get. - name: The name of the `KeyValueStore` to get (global scope). - alias: The alias of the `KeyValueStore` to get (run scope, unnamed). + name: The name of the `KeyValueStore` to get (global scope, named storage). + alias: The alias of the `KeyValueStore` to get (run scope, unnamed storage). """ @@ -482,8 +482,8 @@ def __call__( Args: data: The data to push to the `Dataset`. dataset_id: The ID of the `Dataset` to push the data to. - dataset_name: The name of the `Dataset` to push the data to (global scope). - dataset_alias: The alias of the `Dataset` to push the data to (run scope, unnamed). + dataset_name: The name of the `Dataset` to push the data to (global scope, named storage). + dataset_alias: The alias of the `Dataset` to push the data to (run scope, unnamed storage). **kwargs: Additional keyword arguments. """ diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index e7d983b1fe..1c49b57188 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -784,8 +784,8 @@ async def get_data( Args: dataset_id: The ID of the `Dataset`. - dataset_name: The name of the `Dataset` (global scope). - dataset_alias: The alias of the `Dataset` (run scope, unnamed). + dataset_name: The name of the `Dataset` (global scope, named storage). + dataset_alias: The alias of the `Dataset` (run scope, unnamed storage). kwargs: Keyword arguments to be passed to the `Dataset.get_data()` method. Returns: @@ -809,9 +809,9 @@ async def export_data( Args: path: The destination file path. Must end with '.json' or '.csv'. - dataset_id: The ID of the Dataset to export from. If None, uses `name` parameter instead. - dataset_name: The name of the Dataset to export from (global scope). If None, uses `id` parameter instead. - dataset_alias: The alias of the Dataset to export from (run scope, unnamed). + dataset_id: The ID of the Dataset to export from. + dataset_name: The name of the Dataset to export from (global scope, named storage). + dataset_alias: The alias of the Dataset to export from (run scope, unnamed storage). """ dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias) @@ -841,8 +841,8 @@ async def _push_data( Args: data: The data to push to the `Dataset`. dataset_id: The ID of the `Dataset`. - dataset_name: The name of the `Dataset` (global scope). - dataset_alias: The alias of the `Dataset` (run scope, unnamed). + dataset_name: The name of the `Dataset` (global scope, named storage). + dataset_alias: The alias of the `Dataset` (run scope, unnamed storage). kwargs: Keyword arguments to be passed to the `Dataset.push_data()` method. """ dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias) diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py index 5c7b281101..c5e1171295 100644 --- a/src/crawlee/storage_clients/_file_system/_dataset_client.py +++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py @@ -102,8 +102,8 @@ async def open( Args: id: The ID of the dataset to open. If provided, searches for existing dataset by ID. - name: The name of the dataset for named storages. Mutually exclusive with alias. - alias: The alias of the dataset for unnamed storages. Mutually exclusive with name. + name: The name of the dataset for named (global scope) storages. + alias: The alias of the dataset for unnamed (run scope) storages. configuration: The configuration object containing storage directory settings. Returns: diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index 7975cd9262..a51143f0a6 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -101,8 +101,8 @@ async def open( Args: id: The ID of the key-value store to open. If provided, searches for existing store by ID. - name: The name of the key-value store for named storages. Mutually exclusive with alias. - alias: The alias of the key-value store for unnamed storages. Mutually exclusive with name. + name: The name of the key-value store for named (global scope) storages. + alias: The alias of the key-value store for unnamed (run scope) storages. configuration: The configuration object containing storage directory settings. Returns: diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 8485c47c5b..d24a88bf80 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -153,8 +153,8 @@ async def open( Args: id: The ID of the request queue to open. If provided, searches for existing queue by ID. - name: The name of the request queue for named storages. Mutually exclusive with alias. - alias: The alias of the request queue for unnamed storages. Mutually exclusive with name. + name: The name of the request queue for named (global scope) storages. + alias: The alias of the request queue for unnamed (run scope) storages. configuration: The configuration object containing storage directory settings. Returns: diff --git a/src/crawlee/storage_clients/_memory/_dataset_client.py b/src/crawlee/storage_clients/_memory/_dataset_client.py index 970678b6d1..b28c320ef5 100644 --- a/src/crawlee/storage_clients/_memory/_dataset_client.py +++ b/src/crawlee/storage_clients/_memory/_dataset_client.py @@ -63,8 +63,8 @@ async def open( Args: id: The ID of the dataset. If not provided, a random ID will be generated. - name: The name of the dataset for named storages. Mutually exclusive with alias. - alias: The alias of the dataset for unnamed storages. Mutually exclusive with name. + name: The name of the dataset for named (global scope) storages. + alias: The alias of the dataset for unnamed (run scope) storages. Returns: An instance for the opened or created storage client. diff --git a/src/crawlee/storage_clients/_memory/_key_value_store_client.py b/src/crawlee/storage_clients/_memory/_key_value_store_client.py index d4d05c9a60..4fd83b0976 100644 --- a/src/crawlee/storage_clients/_memory/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_memory/_key_value_store_client.py @@ -61,8 +61,8 @@ async def open( Args: id: The ID of the key-value store. If not provided, a random ID will be generated. - name: The name of the key-value store for named storages. Mutually exclusive with alias. - alias: The alias of the key-value store for unnamed storages. Mutually exclusive with name. + name: The name of the key-value store for named (global scope) storages. + alias: The alias of the key-value store for unnamed (run scope) storages. Returns: An instance for the opened or created storage client. diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py index cc6cb299d4..85988b2cb3 100644 --- a/src/crawlee/storage_clients/_memory/_request_queue_client.py +++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py @@ -73,8 +73,8 @@ async def open( Args: id: The ID of the request queue. If not provided, a random ID will be generated. - name: The name of the request queue for named storages. Mutually exclusive with alias. - alias: The alias of the request queue for unnamed storages. Mutually exclusive with name. + name: The name of the request queue for named (global scope) storages. + alias: The alias of the request queue for unnamed (run scope) storages. Returns: An instance for the opened or created storage client. diff --git a/tests/unit/storages/test_key_value_store.py b/tests/unit/storages/test_key_value_store.py index 00d9db5b05..1b2f9209ec 100644 --- a/tests/unit/storages/test_key_value_store.py +++ b/tests/unit/storages/test_key_value_store.py @@ -909,30 +909,6 @@ async def test_default_vs_alias_default_equivalence( await default_kvs.drop() -async def test_alias_parameter_validation( - storage_client: StorageClient, - configuration: Configuration, -) -> None: - """Test alias parameter validation.""" - # Should not allow both name and alias - with pytest.raises(ValueError, match=r'Only one of'): - await KeyValueStore.open( - name='test', - alias='test', - storage_client=storage_client, - configuration=configuration, - ) - - # Valid alias should work - alias_kvs = await KeyValueStore.open( - alias='valid_alias', - storage_client=storage_client, - configuration=configuration, - ) - assert alias_kvs.name is None - await alias_kvs.drop() - - async def test_multiple_alias_isolation( storage_client: StorageClient, configuration: Configuration, diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 9594195f31..b2bfda0394 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -1018,30 +1018,6 @@ async def test_default_vs_alias_default_equivalence( await default_rq.drop() -async def test_alias_parameter_validation( - storage_client: StorageClient, - configuration: Configuration, -) -> None: - """Test alias parameter validation.""" - # Should not allow both name and alias - with pytest.raises(ValueError, match=r'Only one of'): - await RequestQueue.open( - name='test', - alias='test', - storage_client=storage_client, - configuration=configuration, - ) - - # Valid alias should work - alias_rq = await RequestQueue.open( - alias='valid_alias', - storage_client=storage_client, - configuration=configuration, - ) - assert alias_rq.name is None - await alias_rq.drop() - - async def test_multiple_alias_isolation( storage_client: StorageClient, configuration: Configuration, From ff4058f408a0e3d9f82f6232801a52b360557178 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Thu, 11 Sep 2025 12:18:08 +0200 Subject: [PATCH 7/8] fix --- src/crawlee/storage_clients/_file_system/_dataset_client.py | 2 +- .../storage_clients/_file_system/_key_value_store_client.py | 2 +- .../storage_clients/_file_system/_request_queue_client.py | 2 +- src/crawlee/storage_clients/_memory/_dataset_client.py | 2 +- src/crawlee/storage_clients/_memory/_key_value_store_client.py | 2 +- src/crawlee/storage_clients/_memory/_request_queue_client.py | 2 +- src/crawlee/storages/_storage_instance_manager.py | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py index c5e1171295..bf9cd08697 100644 --- a/src/crawlee/storage_clients/_file_system/_dataset_client.py +++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py @@ -91,7 +91,7 @@ async def open( *, id: str | None, name: str | None, - alias: str | None = None, + alias: str | None, configuration: Configuration, ) -> FileSystemDatasetClient: """Open or create a file system dataset client. diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index a51143f0a6..af32eb468f 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -90,7 +90,7 @@ async def open( *, id: str | None, name: str | None, - alias: str | None = None, + alias: str | None, configuration: Configuration, ) -> FileSystemKeyValueStoreClient: """Open or create a file system key-value store client. diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index d24a88bf80..a02773c1b7 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -142,7 +142,7 @@ async def open( *, id: str | None, name: str | None, - alias: str | None = None, + alias: str | None, configuration: Configuration, ) -> FileSystemRequestQueueClient: """Open or create a file system request queue client. diff --git a/src/crawlee/storage_clients/_memory/_dataset_client.py b/src/crawlee/storage_clients/_memory/_dataset_client.py index b28c320ef5..5a17959508 100644 --- a/src/crawlee/storage_clients/_memory/_dataset_client.py +++ b/src/crawlee/storage_clients/_memory/_dataset_client.py @@ -53,7 +53,7 @@ async def open( *, id: str | None, name: str | None, - alias: str | None = None, + alias: str | None, ) -> MemoryDatasetClient: """Open or create a new memory dataset client. diff --git a/src/crawlee/storage_clients/_memory/_key_value_store_client.py b/src/crawlee/storage_clients/_memory/_key_value_store_client.py index 4fd83b0976..bc53bae347 100644 --- a/src/crawlee/storage_clients/_memory/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_memory/_key_value_store_client.py @@ -51,7 +51,7 @@ async def open( *, id: str | None, name: str | None, - alias: str | None = None, + alias: str | None, ) -> MemoryKeyValueStoreClient: """Open or create a new memory key-value store client. diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py index 85988b2cb3..600abf30c0 100644 --- a/src/crawlee/storage_clients/_memory/_request_queue_client.py +++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py @@ -63,7 +63,7 @@ async def open( *, id: str | None, name: str | None, - alias: str | None = None, + alias: str | None, ) -> MemoryRequestQueueClient: """Open or create a new memory request queue client. diff --git a/src/crawlee/storages/_storage_instance_manager.py b/src/crawlee/storages/_storage_instance_manager.py index f07933f516..ea86ac7311 100644 --- a/src/crawlee/storages/_storage_instance_manager.py +++ b/src/crawlee/storages/_storage_instance_manager.py @@ -67,7 +67,7 @@ async def open_storage_instance( Raises: ValueError: If multiple parameters out of `id`, `name`, and `alias` are specified. """ - # Validate parameters + # Validate input parameters. specified_params = sum(1 for param in [id, name, alias] if param is not None) if specified_params > 1: raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') From 304dce627673fbf11daa3863a9f6ae1853dbe7f3 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Sep 2025 17:55:54 +0200 Subject: [PATCH 8/8] Add comment about alias in memory storages --- src/crawlee/storage_clients/_memory/_dataset_client.py | 3 +++ src/crawlee/storage_clients/_memory/_key_value_store_client.py | 3 +++ src/crawlee/storage_clients/_memory/_request_queue_client.py | 3 +++ 3 files changed, 9 insertions(+) diff --git a/src/crawlee/storage_clients/_memory/_dataset_client.py b/src/crawlee/storage_clients/_memory/_dataset_client.py index 5a17959508..15b3a17e45 100644 --- a/src/crawlee/storage_clients/_memory/_dataset_client.py +++ b/src/crawlee/storage_clients/_memory/_dataset_client.py @@ -61,6 +61,9 @@ async def open( datasets don't check for existing datasets with the same name or ID since all data exists only in memory and is lost when the process terminates. + Alias does not have any effect on the memory storage client implementation, because unnamed storages + are supported by default, since data are not persisted. + Args: id: The ID of the dataset. If not provided, a random ID will be generated. name: The name of the dataset for named (global scope) storages. diff --git a/src/crawlee/storage_clients/_memory/_key_value_store_client.py b/src/crawlee/storage_clients/_memory/_key_value_store_client.py index bc53bae347..0cf70dbbb8 100644 --- a/src/crawlee/storage_clients/_memory/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_memory/_key_value_store_client.py @@ -59,6 +59,9 @@ async def open( memory KVS don't check for existing stores with the same name or ID since all data exists only in memory and is lost when the process terminates. + Alias does not have any effect on the memory storage client implementation, because unnamed storages + are supported by default, since data are not persisted. + Args: id: The ID of the key-value store. If not provided, a random ID will be generated. name: The name of the key-value store for named (global scope) storages. diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py index 600abf30c0..e3bcfc9d6e 100644 --- a/src/crawlee/storage_clients/_memory/_request_queue_client.py +++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py @@ -71,6 +71,9 @@ async def open( memory queues don't check for existing queues with the same name or ID since all data exists only in memory and is lost when the process terminates. + Alias does not have any effect on the memory storage client implementation, because unnamed storages + are supported by default, since data are not persisted. + Args: id: The ID of the request queue. If not provided, a random ID will be generated. name: The name of the request queue for named (global scope) storages.