diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index 639ef49ce6..286dfde3a4 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -150,7 +150,28 @@ async def drop(self) -> None: @override async def purge(self) -> None: - await self._client.purge() + try: + await self._client.purge() + except NotImplementedError: + client_name = type(self._client).__name__ + if self._name is not None: + logger.warning( + f'Storage client "{client_name}" does not support purging the request queue. ' + f'Skipping purge for named queue "{self._name}" to avoid destroying persistent data; ' + 'the queue contents are left intact.' + ) + return + logger.warning( + f'Storage client "{client_name}" does not support purging the request queue. ' + 'Falling back to dropping and recreating the unnamed queue; the request queue ID may change.' + ) + await self.drop() + # Override `purge_on_start` so the storage client does not try to purge the freshly recreated + # (and necessarily empty) queue and re-raise the same `NotImplementedError`. + recreate_config = service_locator.get_configuration().model_copy(update={'purge_on_start': False}) + new_rq = await RequestQueue.open(configuration=recreate_config) + self._client = new_rq._client # noqa: SLF001 + self._id = new_rq._id # noqa: SLF001 @override async def add_request( diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 09ce769d9e..d77d524150 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -715,6 +715,94 @@ async def test_purge( await rq.drop() +async def test_purge_falls_back_to_drop_for_unnamed_queue_when_not_implemented( + storage_client: StorageClient, + caplog: pytest.LogCaptureFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that `purge` falls back to drop+recreate for unnamed queues when the client raises `NotImplementedError`. + + Some storage clients (e.g. the Apify platform client) do not support purging. For the default unnamed queue + used by `BasicCrawler`, `purge` should drop and recreate the queue so that callers keep working on repeated + runs. Named queues are handled separately to avoid destroying persistent data. + """ + rq = await RequestQueue.open(storage_client=storage_client) + assert rq.name is None + + await rq.add_requests(['https://example.com/1', 'https://example.com/2']) + metadata = await rq.get_metadata() + assert metadata.pending_request_count == 2 + + async def _raise_not_implemented(self: object) -> None: + raise NotImplementedError('Purge is not supported.') + + monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented) + + with caplog.at_level('WARNING'): + await rq.purge() + + assert any( + 'does not support purging' in rec.message and 'dropping and recreating' in rec.message for rec in caplog.records + ) + + # The queue should be empty, usable, and backed by a fresh client (id may differ for backends that mint new ids). + metadata = await rq.get_metadata() + assert metadata.pending_request_count == 0 + assert metadata.total_request_count == 0 + assert metadata.handled_request_count == 0 + assert rq.id is not None + + await rq.add_request('https://example.com/after-purge') + request = await rq.fetch_next_request() + assert request is not None + assert request.url == 'https://example.com/after-purge' + + await rq.drop() + + +async def test_purge_skips_named_queue_when_not_implemented( + storage_client: StorageClient, + caplog: pytest.LogCaptureFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that `purge` is a logged no-op for named queues when the client raises `NotImplementedError`. + + Named queues are considered persistent (e.g. shared across runs on the Apify platform), so falling back + to drop+recreate would silently destroy user data. Instead `purge` logs a warning and leaves the queue + intact. + """ + rq = await RequestQueue.open( + name='purge-fallback-named-test', + storage_client=storage_client, + ) + original_id = rq.id + + await rq.add_requests(['https://example.com/1', 'https://example.com/2']) + metadata = await rq.get_metadata() + assert metadata.pending_request_count == 2 + + async def _raise_not_implemented(self: object) -> None: + raise NotImplementedError('Purge is not supported.') + + monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented) + + with caplog.at_level('WARNING'): + await rq.purge() + + assert any( + 'does not support purging' in rec.message and 'Skipping purge for named queue' in rec.message + for rec in caplog.records + ) + + # Queue identity and contents must be preserved. + assert rq.id == original_id + metadata = await rq.get_metadata() + assert metadata.pending_request_count == 2 + assert metadata.total_request_count == 2 + + await rq.drop() + + async def test_open_with_alias( storage_client: StorageClient, ) -> None: