diff --git a/docs/guides/code_examples/storages/opening.py b/docs/guides/code_examples/storages/opening.py
new file mode 100644
index 0000000000..0e72d574a2
--- /dev/null
+++ b/docs/guides/code_examples/storages/opening.py
@@ -0,0 +1,19 @@
+import asyncio
+
+from crawlee.storages import Dataset
+
+
+async def main() -> None:
+ # Named storage (persists across runs)
+ dataset_named = await Dataset.open(name='my-persistent-dataset')
+
+ # Unnamed storage with alias (purged on start)
+ dataset_unnamed = await Dataset.open(alias='temporary-results')
+
+ # Default unnamed storage (both are equivalent and purged on start)
+ dataset_default = await Dataset.open()
+ dataset_default = await Dataset.open(alias='default')
+
+
+if __name__ == '__main__':
+ asyncio.run(main())
diff --git a/docs/guides/storages.mdx b/docs/guides/storages.mdx
index 227b08af14..076b54647b 100644
--- a/docs/guides/storages.mdx
+++ b/docs/guides/storages.mdx
@@ -9,6 +9,8 @@ import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import RunnableCodeBlock from '@site/src/components/RunnableCodeBlock';
+import OpeningExample from '!!raw-loader!roa-loader!./code_examples/storages/opening.py';
+
import RqBasicExample from '!!raw-loader!roa-loader!./code_examples/storages/rq_basic_example.py';
import RqWithCrawlerExample from '!!raw-loader!roa-loader!./code_examples/storages/rq_with_crawler_example.py';
import RqWithCrawlerExplicitExample from '!!raw-loader!roa-loader!./code_examples/storages/rq_with_crawler_explicit_example.py';
@@ -26,7 +28,9 @@ import KvsWithCrawlerExplicitExample from '!!raw-loader!roa-loader!./code_exampl
import CleaningDoNotPurgeExample from '!!raw-loader!roa-loader!./code_examples/storages/cleaning_do_not_purge_example.py';
import CleaningPurgeExplicitlyExample from '!!raw-loader!roa-loader!./code_examples/storages/cleaning_purge_explicitly_example.py';
-Crawlee offers several storage types for managing and persisting your crawling data. Request-oriented storages, such as the `RequestQueue`, help you store and deduplicate URLs, while result-oriented storages, like `Dataset` and `KeyValueStore`, focus on storing and retrieving scraping results. This guide helps you choose the storage type that suits your needs.
+Crawlee offers several storage types for managing and persisting your crawling data. Request-oriented storages, such as the `RequestQueue`, help you store and deduplicate URLs, while result-oriented storages, like `Dataset` and `KeyValueStore`, focus on storing and retrieving scraping results. This guide explains when to use each type, how to interact with them, and how to control their lifecycle.
+
+## Overview
Crawlee's storage system consists of two main layers:
- **Storages** (`Dataset`, `KeyValueStore`, `RequestQueue`): High-level interfaces for interacting with different storage types.
@@ -70,6 +74,21 @@ Storage --|> KeyValueStore
Storage --|> RequestQueue
```
+### Named and unnamed storages
+
+Crawlee supports two types of storages:
+
+- **Named storages**: Persistent storages with a specific name that persist across runs. These are useful when you want to share data between different crawler runs or access the same storage from multiple places.
+- **Unnamed storages**: Temporary storages identified by an alias that are scoped to a single run. These are automatically purged at the start of each run (when `purge_on_start` is enabled, which is the default).
+
+### Default storage
+
+Each storage type (`Dataset`, `KeyValueStore`, `RequestQueue`) has a default instance that can be accessed without specifying `id`, `name` or `alias`. Default unnamed storage is accessed by calling storage's `open` method without parameters. This is the most common way to use storages in simple crawlers. The special alias `"default"` is equivalent to calling `open` without parameters
+
+
+ {OpeningExample}
+
+
## Request queue
The `RequestQueue` is the primary storage for URLs in Crawlee, especially useful for deep crawling. It supports dynamic addition of URLs, making it ideal for recursive tasks where URLs are discovered and added during the crawling process (e.g., following links across multiple pages). Each Crawlee project has a **default request queue**, which can be used to store URLs during a specific run.
@@ -186,13 +205,7 @@ Crawlee provides the following helper function to simplify interactions with the
## Cleaning up the storages
-By default, Crawlee automatically cleans up **default storages** before each crawler run to ensure a clean state. This behavior is controlled by the `Configuration.purge_on_start` setting (default: `True`).
-
-### What gets purged
-
-- **Default storages** are completely removed and recreated at the start of each run, ensuring that you start with a clean slate.
-- **Named storages** are never automatically purged and persist across runs.
-- The behavior depends on the storage client implementation.
+By default, Crawlee cleans up all unnamed storages (including the default one) at the start of each run, so every crawl begins with a clean state. This behavior is controlled by `Configuration.purge_on_start` (default: True). In contrast, named storages are never purged automatically and persist across runs. The exact behavior may vary depending on the storage client implementation.
### When purging happens
@@ -221,6 +234,6 @@ Note that purging behavior may vary between storage client implementations. For
## Conclusion
-This guide introduced you to the different storage types available in Crawlee and how to interact with them. You learned how to manage requests using the `RequestQueue` and store and retrieve scraping results using the `Dataset` and `KeyValueStore`. You also discovered how to use helper functions to simplify interactions with these storages. Finally, you learned how to clean up storages before starting a crawler run.
+This guide introduced you to the different storage types available in Crawlee and how to interact with them. You learned about the distinction between named storages (persistent across runs) and unnamed storages with aliases (temporary and purged on start). You discovered how to manage requests using the `RequestQueue` and store and retrieve scraping results using the `Dataset` and `KeyValueStore`. You also learned how to use helper functions to simplify interactions with these storages and how to control storage cleanup behavior.
If you have questions or need assistance, feel free to reach out on our [GitHub](https://github.com/apify/crawlee-python) or join our [Discord community](https://discord.com/invite/jyEM2PRvMU). Happy scraping!
diff --git a/src/crawlee/_types.py b/src/crawlee/_types.py
index 42ac9412bd..51f9d357e7 100644
--- a/src/crawlee/_types.py
+++ b/src/crawlee/_types.py
@@ -189,6 +189,7 @@ class PushDataFunctionCall(PushDataKwargs):
data: list[dict[str, Any]] | dict[str, Any]
dataset_id: str | None
dataset_name: str | None
+ dataset_alias: str | None
class KeyValueStoreInterface(Protocol):
@@ -255,7 +256,7 @@ def __init__(self, *, key_value_store_getter: GetKeyValueStoreFunction) -> None:
self._key_value_store_getter = key_value_store_getter
self.add_requests_calls = list[AddRequestsKwargs]()
self.push_data_calls = list[PushDataFunctionCall]()
- self.key_value_store_changes = dict[tuple[str | None, str | None], KeyValueStoreChangeRecords]()
+ self.key_value_store_changes = dict[tuple[str | None, str | None, str | None], KeyValueStoreChangeRecords]()
async def add_requests(
self,
@@ -270,6 +271,7 @@ async def push_data(
data: list[dict[str, Any]] | dict[str, Any],
dataset_id: str | None = None,
dataset_name: str | None = None,
+ dataset_alias: str | None = None,
**kwargs: Unpack[PushDataKwargs],
) -> None:
"""Track a call to the `push_data` context helper."""
@@ -278,6 +280,7 @@ async def push_data(
data=data,
dataset_id=dataset_id,
dataset_name=dataset_name,
+ dataset_alias=dataset_alias,
**kwargs,
)
)
@@ -287,13 +290,14 @@ async def get_key_value_store(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
) -> KeyValueStoreInterface:
- if (id, name) not in self.key_value_store_changes:
- self.key_value_store_changes[id, name] = KeyValueStoreChangeRecords(
- await self._key_value_store_getter(id=id, name=name)
+ if (id, name, alias) not in self.key_value_store_changes:
+ self.key_value_store_changes[id, name, alias] = KeyValueStoreChangeRecords(
+ await self._key_value_store_getter(id=id, name=name, alias=alias)
)
- return self.key_value_store_changes[id, name]
+ return self.key_value_store_changes[id, name, alias]
@docs_group('Functions')
@@ -424,12 +428,14 @@ def __call__(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
) -> Coroutine[None, None, KeyValueStore]:
"""Call dunder method.
Args:
id: The ID of the `KeyValueStore` to get.
- name: The name of the `KeyValueStore` to get.
+ name: The name of the `KeyValueStore` to get (global scope, named storage).
+ alias: The alias of the `KeyValueStore` to get (run scope, unnamed storage).
"""
@@ -444,12 +450,14 @@ def __call__(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
) -> Coroutine[None, None, KeyValueStoreInterface]:
"""Call dunder method.
Args:
id: The ID of the `KeyValueStore` to get.
- name: The name of the `KeyValueStore` to get.
+ name: The name of the `KeyValueStore` to get (global scope, named storage).
+ alias: The alias of the `KeyValueStore` to get (run scope, unnamed storage).
"""
@@ -466,6 +474,7 @@ def __call__(
data: list[dict[str, Any]] | dict[str, Any],
dataset_id: str | None = None,
dataset_name: str | None = None,
+ dataset_alias: str | None = None,
**kwargs: Unpack[PushDataKwargs],
) -> Coroutine[None, None, None]:
"""Call dunder method.
@@ -473,7 +482,8 @@ def __call__(
Args:
data: The data to push to the `Dataset`.
dataset_id: The ID of the `Dataset` to push the data to.
- dataset_name: The name of the `Dataset` to push the data to.
+ dataset_name: The name of the `Dataset` to push the data to (global scope, named storage).
+ dataset_alias: The alias of the `Dataset` to push the data to (run scope, unnamed storage).
**kwargs: Additional keyword arguments.
"""
diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py
index 29498d8c26..1c49b57188 100644
--- a/src/crawlee/crawlers/_basic/_basic_crawler.py
+++ b/src/crawlee/crawlers/_basic/_basic_crawler.py
@@ -557,18 +557,20 @@ async def get_dataset(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
) -> Dataset:
"""Return the `Dataset` with the given ID or name. If none is provided, return the default one."""
- return await Dataset.open(id=id, name=name)
+ return await Dataset.open(id=id, name=name, alias=alias)
async def get_key_value_store(
self,
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
) -> KeyValueStore:
"""Return the `KeyValueStore` with the given ID or name. If none is provided, return the default KVS."""
- return await KeyValueStore.open(id=id, name=name)
+ return await KeyValueStore.open(id=id, name=name, alias=alias)
def error_handler(
self, handler: ErrorHandler[TCrawlingContext | BasicCrawlingContext]
@@ -772,6 +774,7 @@ async def get_data(
self,
dataset_id: str | None = None,
dataset_name: str | None = None,
+ dataset_alias: str | None = None,
**kwargs: Unpack[GetDataKwargs],
) -> DatasetItemsListPage:
"""Retrieve data from a `Dataset`.
@@ -781,13 +784,14 @@ async def get_data(
Args:
dataset_id: The ID of the `Dataset`.
- dataset_name: The name of the `Dataset`.
+ dataset_name: The name of the `Dataset` (global scope, named storage).
+ dataset_alias: The alias of the `Dataset` (run scope, unnamed storage).
kwargs: Keyword arguments to be passed to the `Dataset.get_data()` method.
Returns:
The retrieved data.
"""
- dataset = await Dataset.open(id=dataset_id, name=dataset_name)
+ dataset = await Dataset.open(id=dataset_id, name=dataset_name, alias=dataset_alias)
return await dataset.get_data(**kwargs)
async def export_data(
@@ -795,6 +799,7 @@ async def export_data(
path: str | Path,
dataset_id: str | None = None,
dataset_name: str | None = None,
+ dataset_alias: str | None = None,
) -> None:
"""Export all items from a Dataset to a JSON or CSV file.
@@ -804,10 +809,11 @@ async def export_data(
Args:
path: The destination file path. Must end with '.json' or '.csv'.
- dataset_id: The ID of the Dataset to export from. If None, uses `name` parameter instead.
- dataset_name: The name of the Dataset to export from. If None, uses `id` parameter instead.
+ dataset_id: The ID of the Dataset to export from.
+ dataset_name: The name of the Dataset to export from (global scope, named storage).
+ dataset_alias: The alias of the Dataset to export from (run scope, unnamed storage).
"""
- dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
+ dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias)
path = path if isinstance(path, Path) else Path(path)
dst = path.open('w', newline='')
@@ -824,6 +830,7 @@ async def _push_data(
data: list[dict[str, Any]] | dict[str, Any],
dataset_id: str | None = None,
dataset_name: str | None = None,
+ dataset_alias: str | None = None,
**kwargs: Unpack[PushDataKwargs],
) -> None:
"""Push data to a `Dataset`.
@@ -834,10 +841,11 @@ async def _push_data(
Args:
data: The data to push to the `Dataset`.
dataset_id: The ID of the `Dataset`.
- dataset_name: The name of the `Dataset`.
+ dataset_name: The name of the `Dataset` (global scope, named storage).
+ dataset_alias: The alias of the `Dataset` (run scope, unnamed storage).
kwargs: Keyword arguments to be passed to the `Dataset.push_data()` method.
"""
- dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
+ dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias)
await dataset.push_data(data, **kwargs)
def _should_retry_request(self, context: BasicCrawlingContext, error: Exception) -> bool:
@@ -1226,8 +1234,8 @@ async def _commit_key_value_store_changes(
result: RequestHandlerRunResult, get_kvs: GetKeyValueStoreFromRequestHandlerFunction
) -> None:
"""Store key value store changes recorded in result."""
- for (id, name), changes in result.key_value_store_changes.items():
- store = await get_kvs(id=id, name=name)
+ for (id, name, alias), changes in result.key_value_store_changes.items():
+ store = await get_kvs(id=id, name=name, alias=alias)
for key, value in changes.updates.items():
await store.set_value(key, value.content, value.content_type)
diff --git a/src/crawlee/storage_clients/_base/_storage_client.py b/src/crawlee/storage_clients/_base/_storage_client.py
index 9be648a0e5..fba8507337 100644
--- a/src/crawlee/storage_clients/_base/_storage_client.py
+++ b/src/crawlee/storage_clients/_base/_storage_client.py
@@ -34,6 +34,7 @@ async def create_dataset_client(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
) -> DatasetClient:
"""Create a dataset client."""
@@ -44,6 +45,7 @@ async def create_kvs_client(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
) -> KeyValueStoreClient:
"""Create a key-value store client."""
@@ -54,6 +56,7 @@ async def create_rq_client(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
) -> RequestQueueClient:
"""Create a request queue client."""
diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py
index a3dae65a34..bf9cd08697 100644
--- a/src/crawlee/storage_clients/_file_system/_dataset_client.py
+++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py
@@ -56,7 +56,7 @@ def __init__(
self,
*,
metadata: DatasetMetadata,
- storage_dir: Path,
+ path_to_dataset: Path,
lock: asyncio.Lock,
) -> None:
"""Initialize a new instance.
@@ -65,8 +65,8 @@ def __init__(
"""
self._metadata = metadata
- self._storage_dir = storage_dir
- """The base directory where the storage data are being persisted."""
+ self._path_to_dataset = path_to_dataset
+ """The full path to the dataset directory."""
self._lock = lock
"""A lock to ensure that only one operation is performed at a time."""
@@ -78,10 +78,7 @@ async def get_metadata(self) -> DatasetMetadata:
@property
def path_to_dataset(self) -> Path:
"""The full path to the dataset directory."""
- if self._metadata.name is None:
- return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT
-
- return self._storage_dir / self._STORAGE_SUBDIR / self._metadata.name
+ return self._path_to_dataset
@property
def path_to_metadata(self) -> Path:
@@ -94,6 +91,7 @@ async def open(
*,
id: str | None,
name: str | None,
+ alias: str | None,
configuration: Configuration,
) -> FileSystemDatasetClient:
"""Open or create a file system dataset client.
@@ -104,17 +102,23 @@ async def open(
Args:
id: The ID of the dataset to open. If provided, searches for existing dataset by ID.
- name: The name of the dataset to open. If not provided, uses the default dataset.
+ name: The name of the dataset for named (global scope) storages.
+ alias: The alias of the dataset for unnamed (run scope) storages.
configuration: The configuration object containing storage directory settings.
Returns:
An instance for the opened or created storage client.
Raises:
- ValueError: If a dataset with the specified ID is not found, or if metadata is invalid.
+ ValueError: If a dataset with the specified ID is not found, if metadata is invalid,
+ or if both name and alias are provided.
"""
- storage_dir = Path(configuration.storage_dir)
- dataset_base_path = storage_dir / cls._STORAGE_SUBDIR
+ # Validate input parameters.
+ specified_params = sum(1 for param in [id, name, alias] if param is not None)
+ if specified_params > 1:
+ raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
+
+ dataset_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR
if not dataset_base_path.exists():
await asyncio.to_thread(dataset_base_path.mkdir, parents=True, exist_ok=True)
@@ -126,19 +130,19 @@ async def open(
if not dataset_dir.is_dir():
continue
- metadata_path = dataset_dir / METADATA_FILENAME
- if not metadata_path.exists():
+ path_to_metadata = dataset_dir / METADATA_FILENAME
+ if not path_to_metadata.exists():
continue
try:
- file = await asyncio.to_thread(metadata_path.open)
+ file = await asyncio.to_thread(path_to_metadata.open)
try:
file_content = json.load(file)
metadata = DatasetMetadata(**file_content)
if metadata.id == id:
client = cls(
metadata=metadata,
- storage_dir=storage_dir,
+ path_to_dataset=dataset_base_path / dataset_dir,
lock=asyncio.Lock(),
)
await client._update_metadata(update_accessed_at=True)
@@ -152,16 +156,15 @@ async def open(
if not found:
raise ValueError(f'Dataset with ID "{id}" not found')
- # Get a new instance by name.
+ # Get a new instance by name or alias.
else:
- dataset_path = (
- dataset_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else dataset_base_path / name
- )
- metadata_path = dataset_path / METADATA_FILENAME
+ dataset_dir = Path(name) if name else Path(alias) if alias else Path('default')
+ path_to_dataset = dataset_base_path / dataset_dir
+ path_to_metadata = path_to_dataset / METADATA_FILENAME
# If the dataset directory exists, reconstruct the client from the metadata file.
- if dataset_path.exists() and metadata_path.exists():
- file = await asyncio.to_thread(open, metadata_path)
+ if path_to_dataset.exists() and path_to_metadata.exists():
+ file = await asyncio.to_thread(open, path_to_metadata)
try:
file_content = json.load(file)
finally:
@@ -169,11 +172,11 @@ async def open(
try:
metadata = DatasetMetadata(**file_content)
except ValidationError as exc:
- raise ValueError(f'Invalid metadata file for dataset "{name}"') from exc
+ raise ValueError(f'Invalid metadata file for dataset "{name or alias}"') from exc
client = cls(
metadata=metadata,
- storage_dir=storage_dir,
+ path_to_dataset=path_to_dataset,
lock=asyncio.Lock(),
)
@@ -192,7 +195,7 @@ async def open(
)
client = cls(
metadata=metadata,
- storage_dir=storage_dir,
+ path_to_dataset=path_to_dataset,
lock=asyncio.Lock(),
)
await client._update_metadata()
diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py
index bc94980bcc..af32eb468f 100644
--- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py
+++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py
@@ -55,7 +55,7 @@ def __init__(
self,
*,
metadata: KeyValueStoreMetadata,
- storage_dir: Path,
+ path_to_kvs: Path,
lock: asyncio.Lock,
) -> None:
"""Initialize a new instance.
@@ -64,8 +64,8 @@ def __init__(
"""
self._metadata = metadata
- self._storage_dir = storage_dir
- """The base directory where the storage data are being persisted."""
+ self._path_to_kvs = path_to_kvs
+ """The full path to the key-value store directory."""
self._lock = lock
"""A lock to ensure that only one operation is performed at a time."""
@@ -77,10 +77,7 @@ async def get_metadata(self) -> KeyValueStoreMetadata:
@property
def path_to_kvs(self) -> Path:
"""The full path to the key-value store directory."""
- if self._metadata.name is None:
- return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT
-
- return self._storage_dir / self._STORAGE_SUBDIR / self._metadata.name
+ return self._path_to_kvs
@property
def path_to_metadata(self) -> Path:
@@ -93,6 +90,7 @@ async def open(
*,
id: str | None,
name: str | None,
+ alias: str | None,
configuration: Configuration,
) -> FileSystemKeyValueStoreClient:
"""Open or create a file system key-value store client.
@@ -103,17 +101,23 @@ async def open(
Args:
id: The ID of the key-value store to open. If provided, searches for existing store by ID.
- name: The name of the key-value store to open. If not provided, uses the default store.
+ name: The name of the key-value store for named (global scope) storages.
+ alias: The alias of the key-value store for unnamed (run scope) storages.
configuration: The configuration object containing storage directory settings.
Returns:
An instance for the opened or created storage client.
Raises:
- ValueError: If a store with the specified ID is not found, or if metadata is invalid.
+ ValueError: If a store with the specified ID is not found, if metadata is invalid,
+ or if both name and alias are provided.
"""
- storage_dir = Path(configuration.storage_dir)
- kvs_base_path = storage_dir / cls._STORAGE_SUBDIR
+ # Validate input parameters.
+ specified_params = sum(1 for param in [id, name, alias] if param is not None)
+ if specified_params > 1:
+ raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
+
+ kvs_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR
if not kvs_base_path.exists():
await asyncio.to_thread(kvs_base_path.mkdir, parents=True, exist_ok=True)
@@ -125,19 +129,19 @@ async def open(
if not kvs_dir.is_dir():
continue
- metadata_path = kvs_dir / METADATA_FILENAME
- if not metadata_path.exists():
+ path_to_metadata = kvs_dir / METADATA_FILENAME
+ if not path_to_metadata.exists():
continue
try:
- file = await asyncio.to_thread(metadata_path.open)
+ file = await asyncio.to_thread(path_to_metadata.open)
try:
file_content = json.load(file)
metadata = KeyValueStoreMetadata(**file_content)
if metadata.id == id:
client = cls(
metadata=metadata,
- storage_dir=storage_dir,
+ path_to_kvs=kvs_base_path / kvs_dir,
lock=asyncio.Lock(),
)
await client._update_metadata(update_accessed_at=True)
@@ -151,14 +155,15 @@ async def open(
if not found:
raise ValueError(f'Key-value store with ID "{id}" not found.')
- # Get a new instance by name.
+ # Get a new instance by name or alias.
else:
- kvs_path = kvs_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else kvs_base_path / name
- metadata_path = kvs_path / METADATA_FILENAME
+ kvs_dir = Path(name) if name else Path(alias) if alias else Path('default')
+ path_to_kvs = kvs_base_path / kvs_dir
+ path_to_metadata = path_to_kvs / METADATA_FILENAME
# If the key-value store directory exists, reconstruct the client from the metadata file.
- if kvs_path.exists() and metadata_path.exists():
- file = await asyncio.to_thread(open, metadata_path)
+ if path_to_kvs.exists() and path_to_metadata.exists():
+ file = await asyncio.to_thread(open, path_to_metadata)
try:
file_content = json.load(file)
finally:
@@ -166,11 +171,11 @@ async def open(
try:
metadata = KeyValueStoreMetadata(**file_content)
except ValidationError as exc:
- raise ValueError(f'Invalid metadata file for key-value store "{name}"') from exc
+ raise ValueError(f'Invalid metadata file for key-value store "{name or alias}"') from exc
client = cls(
metadata=metadata,
- storage_dir=storage_dir,
+ path_to_kvs=path_to_kvs,
lock=asyncio.Lock(),
)
@@ -188,7 +193,7 @@ async def open(
)
client = cls(
metadata=metadata,
- storage_dir=storage_dir,
+ path_to_kvs=path_to_kvs,
lock=asyncio.Lock(),
)
await client._update_metadata()
diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py
index c606af2aa5..a02773c1b7 100644
--- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py
+++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py
@@ -89,7 +89,7 @@ def __init__(
self,
*,
metadata: RequestQueueMetadata,
- storage_dir: Path,
+ path_to_rq: Path,
lock: asyncio.Lock,
) -> None:
"""Initialize a new instance.
@@ -98,8 +98,8 @@ def __init__(
"""
self._metadata = metadata
- self._storage_dir = storage_dir
- """The base directory where the storage data are being persisted."""
+ self._path_to_rq = path_to_rq
+ """The full path to the request queue directory."""
self._lock = lock
"""A lock to ensure that only one operation is performed at a time."""
@@ -129,10 +129,7 @@ async def get_metadata(self) -> RequestQueueMetadata:
@property
def path_to_rq(self) -> Path:
"""The full path to the request queue directory."""
- if self._metadata.name is None:
- return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT
-
- return self._storage_dir / self._STORAGE_SUBDIR / self._metadata.name
+ return self._path_to_rq
@property
def path_to_metadata(self) -> Path:
@@ -145,6 +142,7 @@ async def open(
*,
id: str | None,
name: str | None,
+ alias: str | None,
configuration: Configuration,
) -> FileSystemRequestQueueClient:
"""Open or create a file system request queue client.
@@ -155,17 +153,23 @@ async def open(
Args:
id: The ID of the request queue to open. If provided, searches for existing queue by ID.
- name: The name of the request queue to open. If not provided, uses the default queue.
+ name: The name of the request queue for named (global scope) storages.
+ alias: The alias of the request queue for unnamed (run scope) storages.
configuration: The configuration object containing storage directory settings.
Returns:
An instance for the opened or created storage client.
Raises:
- ValueError: If a queue with the specified ID is not found, or if metadata is invalid.
+ ValueError: If a queue with the specified ID is not found, if metadata is invalid,
+ or if both name and alias are provided.
"""
- storage_dir = Path(configuration.storage_dir)
- rq_base_path = storage_dir / cls._STORAGE_SUBDIR
+ # Validate input parameters.
+ specified_params = sum(1 for param in [id, name, alias] if param is not None)
+ if specified_params > 1:
+ raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
+
+ rq_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR
if not rq_base_path.exists():
await asyncio.to_thread(rq_base_path.mkdir, parents=True, exist_ok=True)
@@ -177,12 +181,12 @@ async def open(
if not rq_dir.is_dir():
continue
- metadata_path = rq_dir / METADATA_FILENAME
- if not metadata_path.exists():
+ path_to_metadata = rq_dir / METADATA_FILENAME
+ if not path_to_metadata.exists():
continue
try:
- file = await asyncio.to_thread(metadata_path.open)
+ file = await asyncio.to_thread(path_to_metadata.open)
try:
file_content = json.load(file)
metadata = RequestQueueMetadata(**file_content)
@@ -190,7 +194,7 @@ async def open(
if metadata.id == id:
client = cls(
metadata=metadata,
- storage_dir=storage_dir,
+ path_to_rq=rq_base_path / rq_dir,
lock=asyncio.Lock(),
)
await client._state.initialize()
@@ -206,14 +210,15 @@ async def open(
if not found:
raise ValueError(f'Request queue with ID "{id}" not found')
- # Open an existing RQ by its name, or create a new one if not found.
+ # Open an existing RQ by its name or alias, or create a new one if not found.
else:
- rq_path = rq_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else rq_base_path / name
- metadata_path = rq_path / METADATA_FILENAME
+ rq_dir = Path(name) if name else Path(alias) if alias else Path('default')
+ path_to_rq = rq_base_path / rq_dir
+ path_to_metadata = path_to_rq / METADATA_FILENAME
# If the RQ directory exists, reconstruct the client from the metadata file.
- if rq_path.exists() and metadata_path.exists():
- file = await asyncio.to_thread(open, metadata_path)
+ if path_to_rq.exists() and path_to_metadata.exists():
+ file = await asyncio.to_thread(open, path_to_metadata)
try:
file_content = json.load(file)
finally:
@@ -221,13 +226,11 @@ async def open(
try:
metadata = RequestQueueMetadata(**file_content)
except ValidationError as exc:
- raise ValueError(f'Invalid metadata file for request queue "{name}"') from exc
-
- metadata.name = name
+ raise ValueError(f'Invalid metadata file for request queue "{name or alias}"') from exc
client = cls(
metadata=metadata,
- storage_dir=storage_dir,
+ path_to_rq=path_to_rq,
lock=asyncio.Lock(),
)
@@ -251,7 +254,7 @@ async def open(
)
client = cls(
metadata=metadata,
- storage_dir=storage_dir,
+ path_to_rq=path_to_rq,
lock=asyncio.Lock(),
)
await client._state.initialize()
diff --git a/src/crawlee/storage_clients/_file_system/_storage_client.py b/src/crawlee/storage_clients/_file_system/_storage_client.py
index 86903ea4e7..94f183db2a 100644
--- a/src/crawlee/storage_clients/_file_system/_storage_client.py
+++ b/src/crawlee/storage_clients/_file_system/_storage_client.py
@@ -35,10 +35,11 @@ async def create_dataset_client(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
) -> FileSystemDatasetClient:
configuration = configuration or Configuration.get_global_configuration()
- client = await FileSystemDatasetClient.open(id=id, name=name, configuration=configuration)
+ client = await FileSystemDatasetClient.open(id=id, name=name, alias=alias, configuration=configuration)
await self._purge_if_needed(client, configuration)
return client
@@ -48,10 +49,11 @@ async def create_kvs_client(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
) -> FileSystemKeyValueStoreClient:
configuration = configuration or Configuration.get_global_configuration()
- client = await FileSystemKeyValueStoreClient.open(id=id, name=name, configuration=configuration)
+ client = await FileSystemKeyValueStoreClient.open(id=id, name=name, alias=alias, configuration=configuration)
await self._purge_if_needed(client, configuration)
return client
@@ -61,9 +63,10 @@ async def create_rq_client(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
) -> FileSystemRequestQueueClient:
configuration = configuration or Configuration.get_global_configuration()
- client = await FileSystemRequestQueueClient.open(id=id, name=name, configuration=configuration)
+ client = await FileSystemRequestQueueClient.open(id=id, name=name, alias=alias, configuration=configuration)
await self._purge_if_needed(client, configuration)
return client
diff --git a/src/crawlee/storage_clients/_memory/_dataset_client.py b/src/crawlee/storage_clients/_memory/_dataset_client.py
index 8426319acf..15b3a17e45 100644
--- a/src/crawlee/storage_clients/_memory/_dataset_client.py
+++ b/src/crawlee/storage_clients/_memory/_dataset_client.py
@@ -53,6 +53,7 @@ async def open(
*,
id: str | None,
name: str | None,
+ alias: str | None,
) -> MemoryDatasetClient:
"""Open or create a new memory dataset client.
@@ -60,14 +61,26 @@ async def open(
datasets don't check for existing datasets with the same name or ID since all data exists only in memory
and is lost when the process terminates.
+ Alias does not have any effect on the memory storage client implementation, because unnamed storages
+ are supported by default, since data are not persisted.
+
Args:
id: The ID of the dataset. If not provided, a random ID will be generated.
- name: The name of the dataset. If not provided, the dataset will be unnamed.
+ name: The name of the dataset for named (global scope) storages.
+ alias: The alias of the dataset for unnamed (run scope) storages.
Returns:
An instance for the opened or created storage client.
+
+ Raises:
+ ValueError: If both name and alias are provided, or if neither id, name, nor alias is provided.
"""
- # Otherwise create a new dataset
+ # Validate input parameters.
+ specified_params = sum(1 for param in [id, name, alias] if param is not None)
+ if specified_params > 1:
+ raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
+
+ # Create a new dataset
dataset_id = id or crypto_random_object_id()
now = datetime.now(timezone.utc)
diff --git a/src/crawlee/storage_clients/_memory/_key_value_store_client.py b/src/crawlee/storage_clients/_memory/_key_value_store_client.py
index 7dacf6d95d..0cf70dbbb8 100644
--- a/src/crawlee/storage_clients/_memory/_key_value_store_client.py
+++ b/src/crawlee/storage_clients/_memory/_key_value_store_client.py
@@ -51,6 +51,7 @@ async def open(
*,
id: str | None,
name: str | None,
+ alias: str | None,
) -> MemoryKeyValueStoreClient:
"""Open or create a new memory key-value store client.
@@ -58,14 +59,26 @@ async def open(
memory KVS don't check for existing stores with the same name or ID since all data exists only in memory
and is lost when the process terminates.
+ Alias does not have any effect on the memory storage client implementation, because unnamed storages
+ are supported by default, since data are not persisted.
+
Args:
id: The ID of the key-value store. If not provided, a random ID will be generated.
- name: The name of the key-value store. If not provided, the store will be unnamed.
+ name: The name of the key-value store for named (global scope) storages.
+ alias: The alias of the key-value store for unnamed (run scope) storages.
Returns:
An instance for the opened or created storage client.
+
+ Raises:
+ ValueError: If both name and alias are provided.
"""
- # Otherwise create a new key-value store
+ # Validate input parameters.
+ specified_params = sum(1 for param in [id, name, alias] if param is not None)
+ if specified_params > 1:
+ raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
+
+ # Create a new key-value store
store_id = id or crypto_random_object_id()
now = datetime.now(timezone.utc)
diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py
index 2e48eceba7..e3bcfc9d6e 100644
--- a/src/crawlee/storage_clients/_memory/_request_queue_client.py
+++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py
@@ -63,6 +63,7 @@ async def open(
*,
id: str | None,
name: str | None,
+ alias: str | None,
) -> MemoryRequestQueueClient:
"""Open or create a new memory request queue client.
@@ -70,14 +71,26 @@ async def open(
memory queues don't check for existing queues with the same name or ID since all data exists only
in memory and is lost when the process terminates.
+ Alias does not have any effect on the memory storage client implementation, because unnamed storages
+ are supported by default, since data are not persisted.
+
Args:
id: The ID of the request queue. If not provided, a random ID will be generated.
- name: The name of the request queue. If not provided, the queue will be unnamed.
+ name: The name of the request queue for named (global scope) storages.
+ alias: The alias of the request queue for unnamed (run scope) storages.
Returns:
An instance for the opened or created storage client.
+
+ Raises:
+ ValueError: If both name and alias are provided.
"""
- # Otherwise create a new queue
+ # Validate input parameters.
+ specified_params = sum(1 for param in [id, name, alias] if param is not None)
+ if specified_params > 1:
+ raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
+
+ # Create a new queue
queue_id = id or crypto_random_object_id()
now = datetime.now(timezone.utc)
diff --git a/src/crawlee/storage_clients/_memory/_storage_client.py b/src/crawlee/storage_clients/_memory/_storage_client.py
index f4ac73e489..fa8ff3589e 100644
--- a/src/crawlee/storage_clients/_memory/_storage_client.py
+++ b/src/crawlee/storage_clients/_memory/_storage_client.py
@@ -33,10 +33,11 @@ async def create_dataset_client(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
) -> MemoryDatasetClient:
configuration = configuration or Configuration.get_global_configuration()
- client = await MemoryDatasetClient.open(id=id, name=name)
+ client = await MemoryDatasetClient.open(id=id, name=name, alias=alias)
await self._purge_if_needed(client, configuration)
return client
@@ -46,10 +47,11 @@ async def create_kvs_client(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
) -> MemoryKeyValueStoreClient:
configuration = configuration or Configuration.get_global_configuration()
- client = await MemoryKeyValueStoreClient.open(id=id, name=name)
+ client = await MemoryKeyValueStoreClient.open(id=id, name=name, alias=alias)
await self._purge_if_needed(client, configuration)
return client
@@ -59,9 +61,10 @@ async def create_rq_client(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
) -> MemoryRequestQueueClient:
configuration = configuration or Configuration.get_global_configuration()
- client = await MemoryRequestQueueClient.open(id=id, name=name)
+ client = await MemoryRequestQueueClient.open(id=id, name=name, alias=alias)
await self._purge_if_needed(client, configuration)
return client
diff --git a/src/crawlee/storages/_base.py b/src/crawlee/storages/_base.py
index c63f1dda37..c0c4ee100a 100644
--- a/src/crawlee/storages/_base.py
+++ b/src/crawlee/storages/_base.py
@@ -36,6 +36,7 @@ async def open(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
storage_client: StorageClient | None = None,
) -> Storage:
@@ -43,7 +44,8 @@ async def open(
Args:
id: The storage ID.
- name: The storage name.
+ name: The storage name (global scope, persists across runs).
+ alias: The storage alias (run scope, creates unnamed storage).
configuration: Configuration object used during the storage creation or restoration process.
storage_client: Underlying storage client to use. If not provided, the default global storage client
from the service locator will be used.
diff --git a/src/crawlee/storages/_dataset.py b/src/crawlee/storages/_dataset.py
index 36fbea8a7a..3e4b4c4981 100644
--- a/src/crawlee/storages/_dataset.py
+++ b/src/crawlee/storages/_dataset.py
@@ -100,6 +100,7 @@ async def open(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
storage_client: StorageClient | None = None,
) -> Dataset:
@@ -110,6 +111,7 @@ async def open(
cls,
id=id,
name=name,
+ alias=alias,
configuration=configuration,
client_opener=storage_client.create_dataset_client,
)
diff --git a/src/crawlee/storages/_key_value_store.py b/src/crawlee/storages/_key_value_store.py
index 5297925a37..ee38166f23 100644
--- a/src/crawlee/storages/_key_value_store.py
+++ b/src/crawlee/storages/_key_value_store.py
@@ -112,6 +112,7 @@ async def open(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
storage_client: StorageClient | None = None,
) -> KeyValueStore:
@@ -122,6 +123,7 @@ async def open(
cls,
id=id,
name=name,
+ alias=alias,
configuration=configuration,
client_opener=storage_client.create_kvs_client,
)
diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py
index 068b5135f0..0141a52e06 100644
--- a/src/crawlee/storages/_request_queue.py
+++ b/src/crawlee/storages/_request_queue.py
@@ -118,6 +118,7 @@ async def open(
*,
id: str | None = None,
name: str | None = None,
+ alias: str | None = None,
configuration: Configuration | None = None,
storage_client: StorageClient | None = None,
) -> RequestQueue:
@@ -128,6 +129,7 @@ async def open(
cls,
id=id,
name=name,
+ alias=alias,
configuration=configuration,
client_opener=storage_client.create_rq_client,
)
diff --git a/src/crawlee/storages/_storage_instance_manager.py b/src/crawlee/storages/_storage_instance_manager.py
index 130a2eec63..ea86ac7311 100644
--- a/src/crawlee/storages/_storage_instance_manager.py
+++ b/src/crawlee/storages/_storage_instance_manager.py
@@ -1,7 +1,7 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable
-from typing import TYPE_CHECKING, TypeVar, cast
+from typing import TYPE_CHECKING, TypeVar
from crawlee.storage_clients._base import DatasetClient, KeyValueStoreClient, RequestQueueClient
@@ -26,6 +26,8 @@ class StorageInstanceManager:
and provides a unified interface for opening and managing storage instances.
"""
+ _DEFAULT_STORAGE = 'default'
+
def __init__(self) -> None:
self._cache_by_id = dict[type[Storage], dict[str, Storage]]()
"""Cache for storage instances by ID, separated by storage type."""
@@ -33,6 +35,9 @@ def __init__(self) -> None:
self._cache_by_name = dict[type[Storage], dict[str, Storage]]()
"""Cache for storage instances by name, separated by storage type."""
+ self._cache_by_alias = dict[type[Storage], dict[str, Storage]]()
+ """Cache for storage instances by alias, separated by storage type."""
+
self._default_instances = dict[type[Storage], Storage]()
"""Cache for default instances of each storage type."""
@@ -42,6 +47,7 @@ async def open_storage_instance(
*,
id: str | None,
name: str | None,
+ alias: str | None,
configuration: Configuration,
client_opener: ClientOpener,
) -> T:
@@ -50,7 +56,8 @@ async def open_storage_instance(
Args:
cls: The storage class to instantiate.
id: Storage ID.
- name: Storage name.
+ name: Storage name (global scope, persists across runs).
+ alias: Storage alias (run scope, creates unnamed storage).
configuration: Configuration object.
client_opener: Function to create the storage client.
@@ -58,14 +65,18 @@ async def open_storage_instance(
The storage instance.
Raises:
- ValueError: If both id and name are specified.
+ ValueError: If multiple parameters out of `id`, `name`, and `alias` are specified.
"""
- if id and name:
- raise ValueError('Only one of "id" or "name" can be specified, not both.')
+ # Validate input parameters.
+ specified_params = sum(1 for param in [id, name, alias] if param is not None)
+ if specified_params > 1:
+ raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
- # Check for default instance
- if id is None and name is None and cls in self._default_instances:
- return cast('T', self._default_instances[cls])
+ # Auto-set alias='default' when no parameters are specified.
+ # Default unnamed storage is equal to alias=default unnamed storage.
+ if specified_params == 0:
+ alias = self._DEFAULT_STORAGE
+ specified_params = 1
# Check cache
if id is not None:
@@ -82,8 +93,38 @@ async def open_storage_instance(
if isinstance(cached_instance, cls):
return cached_instance
+ if alias is not None:
+ type_cache_by_alias = self._cache_by_alias.get(cls, {})
+ if alias in type_cache_by_alias:
+ cached_instance = type_cache_by_alias[alias]
+ if isinstance(cached_instance, cls):
+ return cached_instance
+
+ # Check for conflicts between named and alias storages
+ if name is not None:
+ # Check if there's already an alias storage with the same identifier
+ type_cache_by_alias = self._cache_by_alias.get(cls, {})
+ if name in type_cache_by_alias:
+ raise ValueError(
+ f'Cannot create named storage "{name}" because an alias storage with the same name already exists. '
+ f'Use a different name or drop the existing alias storage first.'
+ )
+
+ if alias is not None:
+ # Check if there's already a named storage with the same identifier
+ type_cache_by_name = self._cache_by_name.get(cls, {})
+ if alias in type_cache_by_name:
+ raise ValueError(
+ f'Cannot create alias storage "{alias}" because a named storage with the same name already exists. '
+ f'Use a different alias or drop the existing named storage first.'
+ )
+
# Create new instance
- client = await client_opener(id=id, name=name, configuration=configuration)
+ # Pass the correct parameters to the storage client
+ if alias is not None:
+ client = await client_opener(id=id, name=None, alias=alias, configuration=configuration)
+ else:
+ client = await client_opener(id=id, name=name, configuration=configuration)
metadata = await client.get_metadata()
instance = cls(client, metadata.id, metadata.name) # type: ignore[call-arg]
@@ -92,14 +133,13 @@ async def open_storage_instance(
# Cache the instance
type_cache_by_id = self._cache_by_id.setdefault(cls, {})
type_cache_by_name = self._cache_by_name.setdefault(cls, {})
+ type_cache_by_alias = self._cache_by_alias.setdefault(cls, {})
type_cache_by_id[instance.id] = instance
if instance_name is not None:
type_cache_by_name[instance_name] = instance
-
- # Set as default if no id/name specified
- if id is None and name is None:
- self._default_instances[cls] = instance
+ if alias is not None:
+ type_cache_by_alias[alias] = instance
return instance
@@ -122,6 +162,12 @@ def remove_from_cache(self, storage_instance: Storage) -> None:
if storage_instance.name in type_cache_by_name:
del type_cache_by_name[storage_instance.name]
+ # Remove from alias cache - need to search by instance since alias is not stored on the instance
+ type_cache_by_alias = self._cache_by_alias.get(storage_type, {})
+ aliases_to_remove = [alias for alias, instance in type_cache_by_alias.items() if instance is storage_instance]
+ for alias in aliases_to_remove:
+ del type_cache_by_alias[alias]
+
# Remove from default instances
if storage_type in self._default_instances and self._default_instances[storage_type] is storage_instance:
del self._default_instances[storage_type]
@@ -130,4 +176,5 @@ def clear_cache(self) -> None:
"""Clear all cached storage instances."""
self._cache_by_id.clear()
self._cache_by_name.clear()
+ self._cache_by_alias.clear()
self._default_instances.clear()
diff --git a/tests/unit/storages/test_dataset.py b/tests/unit/storages/test_dataset.py
index a57d525d56..c8ce6daf01 100644
--- a/tests/unit/storages/test_dataset.py
+++ b/tests/unit/storages/test_dataset.py
@@ -7,6 +7,7 @@
import pytest
+from crawlee import service_locator
from crawlee.configuration import Configuration
from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient
from crawlee.storages import Dataset, KeyValueStore
@@ -178,7 +179,7 @@ async def test_open_with_id_and_name(
configuration: Configuration,
) -> None:
"""Test that open() raises an error when both id and name are provided."""
- with pytest.raises(ValueError, match=r'Only one of "id" or "name" can be specified'):
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
await Dataset.open(
id='some-id',
name='some-name',
@@ -582,3 +583,469 @@ async def test_purge(
# Clean up
await dataset.drop()
+
+
+async def test_open_with_alias(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test opening datasets with alias parameter for NDU functionality."""
+ # Create datasets with different aliases
+ dataset_1 = await Dataset.open(
+ alias='test_alias_1',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ dataset_2 = await Dataset.open(
+ alias='test_alias_2',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Verify they have different IDs but no names (unnamed)
+ assert dataset_1.id != dataset_2.id
+ assert dataset_1.name is None
+ assert dataset_2.name is None
+
+ # Add different data to each
+ await dataset_1.push_data({'source': 'alias_1', 'value': 1})
+ await dataset_2.push_data({'source': 'alias_2', 'value': 2})
+
+ # Verify data isolation
+ data_1 = await dataset_1.get_data()
+ data_2 = await dataset_2.get_data()
+
+ assert data_1.count == 1
+ assert data_2.count == 1
+ assert data_1.items[0]['source'] == 'alias_1'
+ assert data_2.items[0]['source'] == 'alias_2'
+
+ # Clean up
+ await dataset_1.drop()
+ await dataset_2.drop()
+
+
+async def test_alias_caching(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that datasets with same alias return same instance (cached)."""
+ # Open dataset with alias
+ dataset_1 = await Dataset.open(
+ alias='cache_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Open again with same alias
+ dataset_2 = await Dataset.open(
+ alias='cache_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Should be same instance
+ assert dataset_1 is dataset_2
+ assert dataset_1.id == dataset_2.id
+
+ # Clean up
+ await dataset_1.drop()
+
+
+async def test_alias_with_id_error(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that providing both alias and id raises error."""
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
+ await Dataset.open(
+ id='some-id',
+ alias='some-alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+
+async def test_alias_with_name_error(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that providing both alias and name raises error."""
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
+ await Dataset.open(
+ name='some-name',
+ alias='some-alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+
+async def test_alias_with_all_parameters_error(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that providing id, name, and alias raises error."""
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
+ await Dataset.open(
+ id='some-id',
+ name='some-name',
+ alias='some-alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+
+async def test_alias_with_special_characters(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test alias functionality with special characters."""
+ special_aliases = [
+ 'alias-with-dashes',
+ 'alias_with_underscores',
+ 'alias.with.dots',
+ 'alias123with456numbers',
+ 'CamelCaseAlias',
+ ]
+
+ datasets = []
+ for alias in special_aliases:
+ dataset = await Dataset.open(
+ alias=alias,
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ datasets.append(dataset)
+
+ # Add data with the alias as identifier
+ await dataset.push_data({'alias_used': alias, 'test': 'special_chars'})
+
+ # Verify all work correctly
+ for i, dataset in enumerate(datasets):
+ data = await dataset.get_data()
+ assert data.count == 1
+ assert data.items[0]['alias_used'] == special_aliases[i]
+
+ # Clean up
+ for dataset in datasets:
+ await dataset.drop()
+
+
+async def test_named_vs_alias_conflict_detection() -> None:
+ """Test that conflicts between named and alias storages are detected."""
+ # Test 1: Create named storage first, then try alias with same name
+ named_dataset = await Dataset.open(name='conflict_test')
+ assert named_dataset.name == 'conflict_test'
+
+ # Try to create alias with same name - should raise error
+ with pytest.raises(ValueError, match=r'Cannot create alias storage "conflict_test".*already exists'):
+ await Dataset.open(alias='conflict_test')
+
+ # Clean up
+ await named_dataset.drop()
+
+ # Test 2: Create alias first, then try named with same name
+ alias_dataset = await Dataset.open(alias='conflict_test2')
+ assert alias_dataset.name is None # Alias storages have no name
+
+ # Try to create named with same name - should raise error
+ with pytest.raises(ValueError, match=r'Cannot create named storage "conflict_test2".*already exists'):
+ await Dataset.open(name='conflict_test2')
+
+ # Clean up
+ await alias_dataset.drop()
+
+
+async def test_alias_parameter(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test dataset creation and operations with alias parameter."""
+ # Create dataset with alias
+ alias_dataset = await Dataset.open(
+ alias='test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Verify alias dataset properties
+ assert alias_dataset.id is not None
+ assert alias_dataset.name is None # Alias storages should be unnamed
+
+ # Test data operations
+ await alias_dataset.push_data({'type': 'alias', 'value': 1})
+ data = await alias_dataset.get_data()
+ assert data.count == 1
+ assert data.items[0]['type'] == 'alias'
+
+ await alias_dataset.drop()
+
+
+async def test_alias_vs_named_isolation(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that alias and named datasets with same identifier are isolated."""
+ # Create named dataset
+ named_dataset = await Dataset.open(
+ name='test_identifier',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Verify named dataset
+ assert named_dataset.name == 'test_identifier'
+ await named_dataset.push_data({'type': 'named'})
+
+ # Clean up named dataset first
+ await named_dataset.drop()
+
+ # Now create alias dataset with same identifier (should work after cleanup)
+ alias_dataset = await Dataset.open(
+ alias='test_identifier',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Should be different instance
+ assert alias_dataset.name is None
+ await alias_dataset.push_data({'type': 'alias'})
+
+ # Verify alias data
+ alias_data = await alias_dataset.get_data()
+ assert alias_data.items[0]['type'] == 'alias'
+
+ await alias_dataset.drop()
+
+
+async def test_default_vs_alias_default_equivalence(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that default dataset and alias='default' are equivalent."""
+ # Open default dataset
+ default_dataset = await Dataset.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Open alias='default' dataset
+ alias_default_dataset = await Dataset.open(
+ alias='default',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Should be the same
+ assert default_dataset.id == alias_default_dataset.id
+ assert default_dataset.name is None
+ assert alias_default_dataset.name is None
+
+ # Data should be shared
+ await default_dataset.push_data({'source': 'default'})
+ data = await alias_default_dataset.get_data()
+ assert data.items[0]['source'] == 'default'
+
+ await default_dataset.drop()
+
+
+async def test_multiple_alias_isolation(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that different aliases create separate datasets."""
+ datasets = []
+
+ for i in range(3):
+ dataset = await Dataset.open(
+ alias=f'alias_{i}',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ await dataset.push_data({'alias': f'alias_{i}', 'index': i})
+ datasets.append(dataset)
+
+ # All should be different
+ for i in range(3):
+ for j in range(i + 1, 3):
+ assert datasets[i].id != datasets[j].id
+
+ # Verify data isolation
+ for i, dataset in enumerate(datasets):
+ data = await dataset.get_data()
+ assert data.items[0]['alias'] == f'alias_{i}'
+ await dataset.drop()
+
+
+async def test_purge_on_start_enabled(storage_client: StorageClient) -> None:
+ """Test purge behavior when purge_on_start=True: named storages retain data, unnamed storages are purged."""
+
+ # Skip this test for memory storage since it doesn't persist data between client instances.
+ if storage_client.__class__ == MemoryStorageClient:
+ pytest.skip('Memory storage does not persist data between client instances.')
+
+ configuration = Configuration(purge_on_start=True)
+
+ # First, create all storage types with purge enabled and add data.
+ default_dataset = await Dataset.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ alias_dataset = await Dataset.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ named_dataset = await Dataset.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ await default_dataset.push_data({'type': 'default', 'data': 'should_be_purged'})
+ await alias_dataset.push_data({'type': 'alias', 'data': 'should_be_purged'})
+ await named_dataset.push_data({'type': 'named', 'data': 'should_persist'})
+
+ # Verify data was added
+ default_data = await default_dataset.get_data()
+ alias_data = await alias_dataset.get_data()
+ named_data = await named_dataset.get_data()
+
+ assert len(default_data.items) == 1
+ assert len(alias_data.items) == 1
+ assert len(named_data.items) == 1
+
+ # Verify that default and alias storages are unnamed
+ default_metadata = await default_dataset.get_metadata()
+ alias_metadata = await alias_dataset.get_metadata()
+ named_metadata = await named_dataset.get_metadata()
+
+ assert default_metadata.name is None
+ assert alias_metadata.name is None
+ assert named_metadata.name == 'purge_test_named'
+
+ # Clear storage cache to simulate "reopening" storages
+ service_locator.storage_instance_manager.clear_cache()
+
+ # Now "reopen" all storages
+ default_dataset_2 = await Dataset.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ alias_dataset_2 = await Dataset.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ named_dataset_2 = await Dataset.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Check the data after purge
+ default_data_after = await default_dataset_2.get_data()
+ alias_data_after = await alias_dataset_2.get_data()
+ named_data_after = await named_dataset_2.get_data()
+
+ # Unnamed storages (alias and default) should be purged (data removed)
+ assert len(default_data_after.items) == 0
+ assert len(alias_data_after.items) == 0
+
+ # Named storage should retain data (not purged)
+ assert len(named_data_after.items) == 1
+
+ # Clean up
+ await named_dataset_2.drop()
+ await alias_dataset_2.drop()
+ await default_dataset_2.drop()
+
+
+async def test_purge_on_start_disabled(storage_client: StorageClient) -> None:
+ """Test purge behavior when purge_on_start=False: all storages retain data regardless of type."""
+
+ # Skip this test for memory storage since it doesn't persist data between client instances.
+ if storage_client.__class__ == MemoryStorageClient:
+ pytest.skip('Memory storage does not persist data between client instances.')
+
+ configuration = Configuration(purge_on_start=False)
+
+ # First, create all storage types with purge disabled and add data.
+ default_dataset = await Dataset.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ alias_dataset = await Dataset.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ named_dataset = await Dataset.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ await default_dataset.push_data({'type': 'default', 'data': 'should_persist'})
+ await alias_dataset.push_data({'type': 'alias', 'data': 'should_persist'})
+ await named_dataset.push_data({'type': 'named', 'data': 'should_persist'})
+
+ # Verify data was added
+ default_data = await default_dataset.get_data()
+ alias_data = await alias_dataset.get_data()
+ named_data = await named_dataset.get_data()
+
+ assert len(default_data.items) == 1
+ assert len(alias_data.items) == 1
+ assert len(named_data.items) == 1
+
+ # Verify that default and alias storages are unnamed
+ default_metadata = await default_dataset.get_metadata()
+ alias_metadata = await alias_dataset.get_metadata()
+ named_metadata = await named_dataset.get_metadata()
+
+ assert default_metadata.name is None
+ assert alias_metadata.name is None
+ assert named_metadata.name == 'purge_test_named'
+
+ # Clear storage cache to simulate "reopening" storages
+ service_locator.storage_instance_manager.clear_cache()
+
+ # Now "reopen" all storages
+ default_dataset_2 = await Dataset.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ alias_dataset_2 = await Dataset.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ named_dataset_2 = await Dataset.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Check the data after purge
+ default_data_after = await default_dataset_2.get_data()
+ alias_data_after = await alias_dataset_2.get_data()
+ named_data_after = await named_dataset_2.get_data()
+
+ # All storages should retain data (not purged)
+ assert len(default_data_after.items) == 1
+ assert len(alias_data_after.items) == 1
+ assert len(named_data_after.items) == 1
+
+ assert default_data_after.items[0]['data'] == 'should_persist'
+ assert alias_data_after.items[0]['data'] == 'should_persist'
+ assert named_data_after.items[0]['data'] == 'should_persist'
+
+ # Clean up
+ await default_dataset_2.drop()
+ await alias_dataset_2.drop()
+ await named_dataset_2.drop()
diff --git a/tests/unit/storages/test_key_value_store.py b/tests/unit/storages/test_key_value_store.py
index 122de8b157..1b2f9209ec 100644
--- a/tests/unit/storages/test_key_value_store.py
+++ b/tests/unit/storages/test_key_value_store.py
@@ -8,6 +8,7 @@
import pytest
+from crawlee import service_locator
from crawlee.configuration import Configuration
from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient
from crawlee.storages import KeyValueStore
@@ -94,7 +95,7 @@ async def test_open_with_id_and_name(
configuration: Configuration,
) -> None:
"""Test that open() raises an error when both id and name are provided."""
- with pytest.raises(ValueError, match=r'Only one of "id" or "name" can be specified'):
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
await KeyValueStore.open(
id='some-id',
name='some-name',
@@ -598,3 +599,504 @@ async def test_record_exists_after_purge(kvs: KeyValueStore) -> None:
# Should no longer exist
assert await kvs.record_exists('key1') is False
assert await kvs.record_exists('key2') is False
+
+
+async def test_open_with_alias(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test opening key-value stores with alias parameter for NDU functionality."""
+ # Create key-value stores with different aliases
+ kvs_1 = await KeyValueStore.open(
+ alias='test_alias_1',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ kvs_2 = await KeyValueStore.open(
+ alias='test_alias_2',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Verify they have different IDs but no names (unnamed)
+ assert kvs_1.id != kvs_2.id
+ assert kvs_1.name is None
+ assert kvs_2.name is None
+
+ # Add different data to each
+ await kvs_1.set_value('source', 'alias_1')
+ await kvs_2.set_value('source', 'alias_2')
+
+ # Verify data isolation
+ value_1 = await kvs_1.get_value('source')
+ value_2 = await kvs_2.get_value('source')
+
+ assert value_1 == 'alias_1'
+ assert value_2 == 'alias_2'
+
+ # Clean up
+ await kvs_1.drop()
+ await kvs_2.drop()
+
+
+async def test_alias_caching(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that key-value stores with same alias return same instance (cached)."""
+ # Open kvs with alias
+ kvs_1 = await KeyValueStore.open(
+ alias='cache_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Open again with same alias
+ kvs_2 = await KeyValueStore.open(
+ alias='cache_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Should be same instance
+ assert kvs_1 is kvs_2
+ assert kvs_1.id == kvs_2.id
+
+ # Clean up
+ await kvs_1.drop()
+
+
+async def test_alias_with_id_error(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that providing both alias and id raises error."""
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
+ await KeyValueStore.open(
+ id='some-id',
+ alias='some-alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+
+async def test_alias_with_name_error(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that providing both alias and name raises error."""
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
+ await KeyValueStore.open(
+ name='some-name',
+ alias='some-alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+
+async def test_alias_with_special_characters(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test alias functionality with special characters."""
+ special_aliases = [
+ 'alias-with-dashes',
+ 'alias_with_underscores',
+ 'alias.with.dots',
+ 'alias123with456numbers',
+ 'CamelCaseAlias',
+ ]
+
+ stores = []
+ for alias in special_aliases:
+ kvs = await KeyValueStore.open(
+ alias=alias,
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ stores.append(kvs)
+
+ # Add data with the alias as identifier
+ await kvs.set_value('alias_used', alias)
+ await kvs.set_value('test', 'special_chars')
+
+ # Verify all work correctly
+ for i, kvs in enumerate(stores):
+ assert await kvs.get_value('alias_used') == special_aliases[i]
+ assert await kvs.get_value('test') == 'special_chars'
+
+ # Clean up
+ for kvs in stores:
+ await kvs.drop()
+
+
+async def test_alias_key_operations(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that key operations work correctly with alias stores."""
+ kvs = await KeyValueStore.open(
+ alias='key_ops_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Test setting multiple keys
+ test_data = {
+ 'key1': {'data': 'value1', 'number': 1},
+ 'key2': 'simple string value',
+ 'key3': [1, 2, 3, 4, 5],
+ 'key4': None,
+ }
+
+ for key, value in test_data.items():
+ await kvs.set_value(key, value)
+
+ # Test getting all keys
+ keys = await kvs.list_keys()
+ key_names = [k.key for k in keys]
+ assert len(keys) == 4
+ for key in test_data:
+ assert key in key_names
+
+ # Test record_exists
+ for key in test_data:
+ assert await kvs.record_exists(key) is True
+ assert await kvs.record_exists('nonexistent') is False
+
+ # Test iteration
+ collected_keys = [key async for key in kvs.iterate_keys()]
+ assert len(collected_keys) == 4
+
+ # Test deletion
+ await kvs.delete_value('key2')
+ assert await kvs.record_exists('key2') is False
+ assert await kvs.get_value('key2') is None
+
+ # Verify other keys still exist
+ remaining_keys = await kvs.list_keys()
+ assert len(remaining_keys) == 3
+
+ # Clean up
+ await kvs.drop()
+
+
+async def test_named_vs_alias_conflict_detection() -> None:
+ """Test that conflicts between named and alias storages are detected."""
+ # Test 1: Create named storage first, then try alias with same name
+ named_kvs = await KeyValueStore.open(name='conflict_test')
+ assert named_kvs.name == 'conflict_test'
+
+ # Try to create alias with same name - should raise error
+ with pytest.raises(ValueError, match=r'Cannot create alias storage "conflict_test".*already exists'):
+ await KeyValueStore.open(alias='conflict_test')
+
+ # Clean up
+ await named_kvs.drop()
+
+ # Test 2: Create alias first, then try named with same name
+ alias_kvs = await KeyValueStore.open(alias='conflict_test2')
+ assert alias_kvs.name is None # Alias storages have no name
+
+ # Try to create named with same name - should raise error
+ with pytest.raises(ValueError, match=r'Cannot create named storage "conflict_test2".*already exists'):
+ await KeyValueStore.open(name='conflict_test2')
+
+ # Clean up
+ await alias_kvs.drop()
+
+ # Test 3: Different names should work fine
+ named_kvs_ok = await KeyValueStore.open(name='different_name')
+ alias_kvs_ok = await KeyValueStore.open(alias='different_alias')
+
+ assert named_kvs_ok.name == 'different_name'
+ assert alias_kvs_ok.name is None
+
+ # Clean up
+ await named_kvs_ok.drop()
+ await alias_kvs_ok.drop()
+
+
+async def test_alias_parameter(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test key-value store creation and operations with alias parameter."""
+ # Create kvs with alias
+ alias_kvs = await KeyValueStore.open(
+ alias='test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Verify alias kvs properties
+ assert alias_kvs.id is not None
+ assert alias_kvs.name is None # Alias storages should be unnamed
+
+ # Test data operations
+ await alias_kvs.set_value('test_key', {'type': 'alias', 'value': 1})
+ value = await alias_kvs.get_value('test_key')
+ assert value['type'] == 'alias'
+
+ await alias_kvs.drop()
+
+
+async def test_alias_vs_named_isolation(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that alias and named key-value stores with same identifier are isolated."""
+ # Create named kvs
+ named_kvs = await KeyValueStore.open(
+ name='test_identifier',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Verify named kvs
+ assert named_kvs.name == 'test_identifier'
+ await named_kvs.set_value('type', 'named')
+
+ # Clean up named kvs first
+ await named_kvs.drop()
+
+ # Now create alias kvs with same identifier (should work after cleanup)
+ alias_kvs = await KeyValueStore.open(
+ alias='test_identifier',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Should be different instance
+ assert alias_kvs.name is None
+ await alias_kvs.set_value('type', 'alias')
+
+ # Verify alias data
+ alias_value = await alias_kvs.get_value('type')
+ assert alias_value == 'alias'
+
+ await alias_kvs.drop()
+
+
+async def test_default_vs_alias_default_equivalence(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that default key-value store and alias='default' are equivalent."""
+ # Open default kvs
+ default_kvs = await KeyValueStore.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Open alias='default' kvs
+ alias_default_kvs = await KeyValueStore.open(
+ alias='default',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Should be the same
+ assert default_kvs.id == alias_default_kvs.id
+ assert default_kvs.name is None
+ assert alias_default_kvs.name is None
+
+ # Data should be shared
+ await default_kvs.set_value('source', 'default')
+ value = await alias_default_kvs.get_value('source')
+ assert value == 'default'
+
+ await default_kvs.drop()
+
+
+async def test_multiple_alias_isolation(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that different aliases create separate key-value stores."""
+ kvs_stores = []
+
+ for i in range(3):
+ kvs = await KeyValueStore.open(
+ alias=f'alias_{i}',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ await kvs.set_value('alias', f'alias_{i}')
+ await kvs.set_value('index', i)
+ kvs_stores.append(kvs)
+
+ # All should be different
+ for i in range(3):
+ for j in range(i + 1, 3):
+ assert kvs_stores[i].id != kvs_stores[j].id
+
+ # Verify data isolation
+ for i, kvs in enumerate(kvs_stores):
+ alias_value = await kvs.get_value('alias')
+ index_value = await kvs.get_value('index')
+ assert alias_value == f'alias_{i}'
+ # For memory storage, value is preserved as int; for filesystem it's converted to string
+ assert index_value == i or index_value == str(i)
+ await kvs.drop()
+
+
+async def test_purge_on_start_enabled(storage_client: StorageClient) -> None:
+ """Test purge behavior when purge_on_start=True: named storages retain data, unnamed storages are purged."""
+
+ # Skip this test for memory storage since it doesn't persist data between client instances.
+ if storage_client.__class__.__name__ == 'MemoryStorageClient':
+ pytest.skip('Memory storage does not persist data between client instances.')
+
+ configuration = Configuration(purge_on_start=True)
+
+ # First, create all storage types with purge enabled and add data.
+ default_kvs = await KeyValueStore.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ alias_kvs = await KeyValueStore.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ named_kvs = await KeyValueStore.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ await default_kvs.set_value(key='data', value='should_be_purged')
+ await alias_kvs.set_value(key='data', value='should_be_purged')
+ await named_kvs.set_value(key='data', value='should_persist')
+
+ # Verify data was added
+ default_data = await default_kvs.get_value(key='data')
+ alias_data = await alias_kvs.get_value(key='data')
+ named_data = await named_kvs.get_value(key='data')
+
+ assert default_data == 'should_be_purged'
+ assert alias_data == 'should_be_purged'
+ assert named_data == 'should_persist'
+
+ # Verify that default and alias storages are unnamed
+ default_metadata = await default_kvs.get_metadata()
+ alias_metadata = await alias_kvs.get_metadata()
+ named_metadata = await named_kvs.get_metadata()
+
+ assert default_metadata.name is None
+ assert alias_metadata.name is None
+ assert named_metadata.name == 'purge_test_named'
+
+ # Clear storage cache to simulate "reopening" storages
+ service_locator.storage_instance_manager.clear_cache()
+
+ # Now "reopen" all storages
+ default_kvs_2 = await KeyValueStore.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ alias_kvs_2 = await KeyValueStore.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ named_kvs_2 = await KeyValueStore.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Check the data after purge
+ default_data_after = await default_kvs_2.get_value(key='data')
+ alias_data_after = await alias_kvs_2.get_value(key='data')
+ named_data_after = await named_kvs_2.get_value(key='data')
+
+ # Unnamed storages (alias and default) should be purged (data removed)
+ assert default_data_after is None
+ assert alias_data_after is None
+
+ # Named storage should retain data (not purged)
+ assert named_data_after == 'should_persist'
+
+ # Clean up
+ await named_kvs_2.drop()
+ await alias_kvs_2.drop()
+ await default_kvs_2.drop()
+
+
+async def test_purge_on_start_disabled(storage_client: StorageClient) -> None:
+ """Test purge behavior when purge_on_start=False: all storages retain data regardless of type."""
+
+ # Skip this test for memory storage since it doesn't persist data between client instances.
+ if storage_client.__class__.__name__ == 'MemoryStorageClient':
+ pytest.skip('Memory storage does not persist data between client instances.')
+
+ configuration = Configuration(purge_on_start=False)
+
+ # First, create all storage types with purge disabled and add data.
+ default_kvs = await KeyValueStore.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ alias_kvs = await KeyValueStore.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ named_kvs = await KeyValueStore.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ await default_kvs.set_value('data', 'should_persist')
+ await alias_kvs.set_value('data', 'should_persist')
+ await named_kvs.set_value('data', 'should_persist')
+
+ # Verify data was added
+ default_data = await default_kvs.get_value('data')
+ alias_data = await alias_kvs.get_value('data')
+ named_data = await named_kvs.get_value('data')
+
+ assert default_data == 'should_persist'
+ assert alias_data == 'should_persist'
+ assert named_data == 'should_persist'
+
+ # Clear storage cache to simulate "reopening" storages
+ service_locator.storage_instance_manager.clear_cache()
+
+ # Now "reopen" all storages
+ default_kvs_2 = await KeyValueStore.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ alias_kvs_2 = await KeyValueStore.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ named_kvs_2 = await KeyValueStore.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Check the data after reopen
+ default_data_after = await default_kvs_2.get_value('data')
+ alias_data_after = await alias_kvs_2.get_value('data')
+ named_data_after = await named_kvs_2.get_value('data')
+
+ # All storages should retain data when purge is disabled
+ assert default_data_after == 'should_persist'
+ assert alias_data_after == 'should_persist'
+ assert named_data_after == 'should_persist'
+
+ # Clean up
+ await named_kvs_2.drop()
+ await alias_kvs_2.drop()
+ await default_kvs_2.drop()
diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py
index 135e732b33..b2bfda0394 100644
--- a/tests/unit/storages/test_request_queue.py
+++ b/tests/unit/storages/test_request_queue.py
@@ -99,7 +99,7 @@ async def test_open_with_id_and_name(
configuration: Configuration,
) -> None:
"""Test that open() raises an error when both id and name are provided."""
- with pytest.raises(ValueError, match=r'Only one of "id" or "name" can be specified'):
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
await RequestQueue.open(
id='some-id',
name='some-name',
@@ -642,3 +642,658 @@ async def test_purge(
# Clean up
await rq.drop()
+
+
+async def test_open_with_alias(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test opening request queues with alias parameter for NDU functionality."""
+ # Create request queues with different aliases
+ rq_1 = await RequestQueue.open(
+ alias='test_alias_1',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ rq_2 = await RequestQueue.open(
+ alias='test_alias_2',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Verify they have different IDs but no names (unnamed)
+ assert rq_1.id != rq_2.id
+ assert rq_1.name is None
+ assert rq_2.name is None
+
+ # Add different requests to each
+ await rq_1.add_request('https://example.com/1')
+ await rq_1.add_request('https://example.com/2')
+ await rq_2.add_request('https://example.com/3')
+
+ # Verify data isolation
+ request_1 = await rq_1.fetch_next_request()
+ request_2 = await rq_2.fetch_next_request()
+
+ assert request_1 is not None
+ assert request_2 is not None
+ assert request_1.url == 'https://example.com/1'
+ assert request_2.url == 'https://example.com/3'
+
+ # Clean up
+ await rq_1.drop()
+ await rq_2.drop()
+
+
+async def test_alias_caching(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that request queues with same alias return same instance (cached)."""
+ # Open rq with alias
+ rq_1 = await RequestQueue.open(
+ alias='cache_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Open again with same alias
+ rq_2 = await RequestQueue.open(
+ alias='cache_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Should be same instance
+ assert rq_1 is rq_2
+ assert rq_1.id == rq_2.id
+
+ # Clean up
+ await rq_1.drop()
+
+
+async def test_alias_with_id_error(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that providing both alias and id raises error."""
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
+ await RequestQueue.open(
+ id='some-id',
+ alias='some-alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+
+async def test_alias_with_name_error(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that providing both alias and name raises error."""
+ with pytest.raises(ValueError, match=r'Only one of "id", "name", or "alias" can be specified, not multiple.'):
+ await RequestQueue.open(
+ name='some-name',
+ alias='some-alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+
+async def test_alias_with_special_characters(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test alias functionality with special characters."""
+ special_aliases = [
+ 'alias-with-dashes',
+ 'alias_with_underscores',
+ 'alias.with.dots',
+ 'alias123with456numbers',
+ 'CamelCaseAlias',
+ ]
+
+ queues = []
+ for alias in special_aliases:
+ rq = await RequestQueue.open(
+ alias=alias,
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ queues.append(rq)
+
+ # Add request with the alias as identifier in URL
+ await rq.add_request(f'https://example.com/{alias}')
+
+ # Verify all work correctly
+ for i, rq in enumerate(queues):
+ request = await rq.fetch_next_request()
+ assert request is not None
+ assert f'/{special_aliases[i]}' in request.url
+
+ # Clean up
+ for rq in queues:
+ await rq.drop()
+
+
+async def test_alias_request_operations(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that request operations work correctly with alias queues."""
+ rq = await RequestQueue.open(
+ alias='request_ops_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Test adding multiple requests
+ urls = [
+ 'https://example.com/page1',
+ 'https://example.com/page2',
+ 'https://example.com/page3',
+ ]
+
+ for url in urls:
+ result = await rq.add_request(url)
+ assert result.was_already_present is False
+
+ # Test queue metadata
+ metadata = await rq.get_metadata()
+ assert metadata.total_request_count == 3
+ assert metadata.pending_request_count == 3
+ assert metadata.handled_request_count == 0
+
+ # Test fetching and handling requests
+ processed_urls = []
+ while not await rq.is_empty():
+ request = await rq.fetch_next_request()
+ if request:
+ processed_urls.append(request.url)
+ await rq.mark_request_as_handled(request)
+
+ # Verify all requests were processed
+ assert len(processed_urls) == 3
+ assert set(processed_urls) == set(urls)
+
+ # Verify final state
+ metadata = await rq.get_metadata()
+ assert metadata.pending_request_count == 0
+ assert metadata.handled_request_count == 3
+ assert await rq.is_empty() is True
+
+ # Clean up
+ await rq.drop()
+
+
+async def test_alias_forefront_operations(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test forefront operations work correctly with alias queues."""
+ rq = await RequestQueue.open(
+ alias='forefront_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Add normal requests
+ await rq.add_request('https://example.com/normal1')
+ await rq.add_request('https://example.com/normal2')
+
+ # Add priority request to forefront
+ await rq.add_request('https://example.com/priority', forefront=True)
+
+ # Priority request should come first
+ priority_request = await rq.fetch_next_request()
+ assert priority_request is not None
+ assert priority_request.url == 'https://example.com/priority'
+
+ # Then normal requests
+ normal_request = await rq.fetch_next_request()
+ assert normal_request is not None
+ assert normal_request.url == 'https://example.com/normal1'
+
+ # Clean up
+ await rq.drop()
+
+
+async def test_alias_batch_operations(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test batch operations work correctly with alias queues."""
+ rq = await RequestQueue.open(
+ alias='batch_test',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Test batch adding
+ batch_urls = [
+ 'https://example.com/batch1',
+ 'https://example.com/batch2',
+ 'https://example.com/batch3',
+ ]
+
+ await rq.add_requests(batch_urls)
+
+ # Wait for background processing
+ await asyncio.sleep(0.1)
+
+ # Verify all requests were added
+ metadata = await rq.get_metadata()
+ assert metadata.total_request_count == 3
+
+ # Clean up
+ await rq.drop()
+
+
+async def test_named_vs_alias_conflict_detection() -> None:
+ """Test that conflicts between named and alias storages are detected."""
+ # Test 1: Create named storage first, then try alias with same name
+ named_rq = await RequestQueue.open(name='conflict_test')
+ assert named_rq.name == 'conflict_test'
+
+ # Try to create alias with same name - should raise error
+ with pytest.raises(ValueError, match=r'Cannot create alias storage "conflict_test".*already exists'):
+ await RequestQueue.open(alias='conflict_test')
+
+ # Clean up
+ await named_rq.drop()
+
+ # Test 2: Create alias first, then try named with same name
+ alias_rq = await RequestQueue.open(alias='conflict_test2')
+ assert alias_rq.name is None # Alias storages have no name
+
+ # Try to create named with same name - should raise error
+ with pytest.raises(ValueError, match=r'Cannot create named storage "conflict_test2".*already exists'):
+ await RequestQueue.open(name='conflict_test2')
+
+ # Clean up
+ await alias_rq.drop()
+
+ # Test 3: Different names should work fine
+ named_rq_ok = await RequestQueue.open(name='different_name')
+ alias_rq_ok = await RequestQueue.open(alias='different_alias')
+
+ assert named_rq_ok.name == 'different_name'
+ assert alias_rq_ok.name is None
+
+ # Clean up
+ await named_rq_ok.drop()
+ await alias_rq_ok.drop()
+
+
+async def test_alias_parameter(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test request queue creation and operations with alias parameter."""
+ # Create request queue with alias
+ alias_rq = await RequestQueue.open(
+ alias='test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Verify alias request queue properties
+ assert alias_rq.id is not None
+ assert alias_rq.name is None # Alias storages should be unnamed
+
+ # Test data operations
+ await alias_rq.add_request('https://example.com/alias')
+ metadata = await alias_rq.get_metadata()
+ assert metadata.pending_request_count == 1
+
+ await alias_rq.drop()
+
+
+async def test_alias_vs_named_isolation(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that alias and named request queues with same identifier are isolated."""
+ # Create named request queue
+ named_rq = await RequestQueue.open(
+ name='test_identifier',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Verify named request queue
+ assert named_rq.name == 'test_identifier'
+ await named_rq.add_request('https://named.example.com')
+
+ # Clean up named request queue first
+ await named_rq.drop()
+
+ # Now create alias request queue with same identifier (should work after cleanup)
+ alias_rq = await RequestQueue.open(
+ alias='test_identifier',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Should be different instance
+ assert alias_rq.name is None
+ await alias_rq.add_request('https://alias.example.com')
+
+ # Verify alias data
+ alias_request = await alias_rq.fetch_next_request()
+ assert alias_request is not None
+ assert alias_request.url == 'https://alias.example.com'
+
+ await alias_rq.drop()
+
+
+async def test_default_vs_alias_default_equivalence(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that default request queue and alias='default' are equivalent."""
+ # Open default request queue
+ default_rq = await RequestQueue.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Open alias='default' request queue
+ alias_default_rq = await RequestQueue.open(
+ alias='default',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Should be the same
+ assert default_rq.id == alias_default_rq.id
+ assert default_rq.name is None
+ assert alias_default_rq.name is None
+
+ # Data should be shared
+ await default_rq.add_request('https://default.example.com')
+ metadata = await alias_default_rq.get_metadata()
+ assert metadata.pending_request_count == 1
+
+ await default_rq.drop()
+
+
+async def test_multiple_alias_isolation(
+ storage_client: StorageClient,
+ configuration: Configuration,
+) -> None:
+ """Test that different aliases create separate request queues."""
+ request_queues = []
+
+ for i in range(3):
+ rq = await RequestQueue.open(
+ alias=f'alias_{i}',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ await rq.add_request(f'https://example.com/alias_{i}')
+ request_queues.append(rq)
+
+ # All should be different
+ for i in range(3):
+ for j in range(i + 1, 3):
+ assert request_queues[i].id != request_queues[j].id
+
+ # Verify data isolation
+ for i, rq in enumerate(request_queues):
+ request = await rq.fetch_next_request()
+ assert request is not None
+ assert request.url == f'https://example.com/alias_{i}'
+ await rq.drop()
+
+
+async def test_purge_on_start_enabled(storage_client: StorageClient) -> None:
+ """Test purge behavior when purge_on_start=True: named storages retain data, unnamed storages are purged."""
+
+ # Skip this test for memory storage since it doesn't persist data between client instances.
+ if storage_client.__class__.__name__ == 'MemoryStorageClient':
+ pytest.skip('Memory storage does not persist data between client instances.')
+
+ configuration = Configuration(purge_on_start=True)
+
+ # First, create all storage types with purge enabled and add data.
+ default_rq = await RequestQueue.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ alias_rq = await RequestQueue.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ named_rq = await RequestQueue.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ await default_rq.add_requests(
+ [
+ 'https://default.example.com/1',
+ 'https://default.example.com/2',
+ 'https://default.example.com/3',
+ ]
+ )
+ await alias_rq.add_requests(
+ [
+ 'https://alias.example.com/1',
+ 'https://alias.example.com/2',
+ 'https://alias.example.com/3',
+ ]
+ )
+ await named_rq.add_requests(
+ [
+ 'https://named.example.com/1',
+ 'https://named.example.com/2',
+ 'https://named.example.com/3',
+ ]
+ )
+
+ default_request = await default_rq.fetch_next_request()
+ alias_request = await alias_rq.fetch_next_request()
+ named_request = await named_rq.fetch_next_request()
+
+ assert default_request is not None
+ assert alias_request is not None
+ assert named_request is not None
+
+ await default_rq.mark_request_as_handled(default_request)
+ await alias_rq.mark_request_as_handled(alias_request)
+ await named_rq.mark_request_as_handled(named_request)
+
+ # Verify data was added
+ default_metadata = await default_rq.get_metadata()
+ alias_metadata = await alias_rq.get_metadata()
+ named_metadata = await named_rq.get_metadata()
+
+ assert default_metadata.pending_request_count == 2
+ assert alias_metadata.pending_request_count == 2
+ assert named_metadata.pending_request_count == 2
+
+ assert default_metadata.handled_request_count == 1
+ assert alias_metadata.handled_request_count == 1
+ assert named_metadata.handled_request_count == 1
+
+ assert default_metadata.total_request_count == 3
+ assert alias_metadata.total_request_count == 3
+ assert named_metadata.total_request_count == 3
+
+ # Verify that default and alias storages are unnamed
+ assert default_metadata.name is None
+ assert alias_metadata.name is None
+ assert named_metadata.name == 'purge_test_named'
+
+ # Clear storage cache to simulate "reopening" storages
+ service_locator.storage_instance_manager.clear_cache()
+
+ # Now "reopen" all storages
+ default_rq_2 = await RequestQueue.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ alias_rq_2 = await RequestQueue.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ named_rq_2 = await RequestQueue.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Check the data after purge
+ default_metadata_after = await default_rq_2.get_metadata()
+ alias_metadata_after = await alias_rq_2.get_metadata()
+ named_metadata_after = await named_rq_2.get_metadata()
+
+ # Unnamed storages (alias and default) should be purged (data removed)
+ assert default_metadata_after.pending_request_count == 0
+ assert alias_metadata_after.pending_request_count == 0
+ assert named_metadata_after.pending_request_count == 2
+
+ assert default_metadata_after.handled_request_count == 1
+ assert alias_metadata_after.handled_request_count == 1
+ assert named_metadata_after.handled_request_count == 1
+
+ assert default_metadata_after.total_request_count == 3
+ assert alias_metadata_after.total_request_count == 3
+ assert named_metadata_after.total_request_count == 3
+
+ # Clean up
+ await named_rq_2.drop()
+ await alias_rq_2.drop()
+ await default_rq_2.drop()
+
+
+async def test_purge_on_start_disabled(storage_client: StorageClient) -> None:
+ """Test purge behavior when purge_on_start=False: all storages retain data regardless of type."""
+
+ # Skip this test for memory storage since it doesn't persist data between client instances.
+ if storage_client.__class__.__name__ == 'MemoryStorageClient':
+ pytest.skip('Memory storage does not persist data between client instances.')
+
+ configuration = Configuration(purge_on_start=False)
+
+ # First, create all storage types with purge disabled and add data.
+ default_rq = await RequestQueue.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ alias_rq = await RequestQueue.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ named_rq = await RequestQueue.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ await default_rq.add_requests(
+ [
+ 'https://default.example.com/1',
+ 'https://default.example.com/2',
+ 'https://default.example.com/3',
+ ]
+ )
+ await alias_rq.add_requests(
+ [
+ 'https://alias.example.com/1',
+ 'https://alias.example.com/2',
+ 'https://alias.example.com/3',
+ ]
+ )
+ await named_rq.add_requests(
+ [
+ 'https://named.example.com/1',
+ 'https://named.example.com/2',
+ 'https://named.example.com/3',
+ ]
+ )
+
+ default_request = await default_rq.fetch_next_request()
+ alias_request = await alias_rq.fetch_next_request()
+ named_request = await named_rq.fetch_next_request()
+
+ assert default_request is not None
+ assert alias_request is not None
+ assert named_request is not None
+
+ await default_rq.mark_request_as_handled(default_request)
+ await alias_rq.mark_request_as_handled(alias_request)
+ await named_rq.mark_request_as_handled(named_request)
+
+ # Verify data was added
+ default_metadata = await default_rq.get_metadata()
+ alias_metadata = await alias_rq.get_metadata()
+ named_metadata = await named_rq.get_metadata()
+
+ assert default_metadata.pending_request_count == 2
+ assert alias_metadata.pending_request_count == 2
+ assert named_metadata.pending_request_count == 2
+
+ assert default_metadata.handled_request_count == 1
+ assert alias_metadata.handled_request_count == 1
+ assert named_metadata.handled_request_count == 1
+
+ assert default_metadata.total_request_count == 3
+ assert alias_metadata.total_request_count == 3
+ assert named_metadata.total_request_count == 3
+
+ # Verify that default and alias storages are unnamed
+ assert default_metadata.name is None
+ assert alias_metadata.name is None
+ assert named_metadata.name == 'purge_test_named'
+
+ # Clear storage cache to simulate "reopening" storages
+ service_locator.storage_instance_manager.clear_cache()
+
+ # Now "reopen" all storages
+ default_rq_2 = await RequestQueue.open(
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ alias_rq_2 = await RequestQueue.open(
+ alias='purge_test_alias',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+ named_rq_2 = await RequestQueue.open(
+ name='purge_test_named',
+ storage_client=storage_client,
+ configuration=configuration,
+ )
+
+ # Check the data after purge
+ default_metadata_after = await default_rq_2.get_metadata()
+ alias_metadata_after = await alias_rq_2.get_metadata()
+ named_metadata_after = await named_rq_2.get_metadata()
+
+ # Unnamed storages (alias and default) should be purged (data removed)
+ assert default_metadata_after.pending_request_count == 2
+ assert alias_metadata_after.pending_request_count == 2
+ assert named_metadata_after.pending_request_count == 2
+
+ assert default_metadata_after.handled_request_count == 1
+ assert alias_metadata_after.handled_request_count == 1
+ assert named_metadata_after.handled_request_count == 1
+
+ assert default_metadata_after.total_request_count == 3
+ assert alias_metadata_after.total_request_count == 3
+ assert named_metadata_after.total_request_count == 3
+
+ # Clean up
+ await named_rq_2.drop()
+ await alias_rq_2.drop()
+ await default_rq_2.drop()