diff --git a/docs/guides/code_examples/request_loaders/rl_basic_example_with_persist.py b/docs/guides/code_examples/request_loaders/rl_basic_example_with_persist.py new file mode 100644 index 0000000000..a3d2f89304 --- /dev/null +++ b/docs/guides/code_examples/request_loaders/rl_basic_example_with_persist.py @@ -0,0 +1,46 @@ +import asyncio +import logging + +from crawlee import service_locator +from crawlee.request_loaders import RequestList + +logging.basicConfig(level=logging.INFO, format='%(asctime)s-%(levelname)s-%(message)s') +logger = logging.getLogger(__name__) + + +# Disable clearing the `KeyValueStore` on each run. +# This is necessary so that the state keys are not cleared at startup. +# The recommended way to achieve this behavior is setting the environment variable +# `CRAWLEE_PURGE_ON_START=0` +configuration = service_locator.get_configuration() +configuration.purge_on_start = False + + +async def main() -> None: + # Open the request list, if it does not exist, it will be created. + # Leave name empty to use the default request list. + request_list = RequestList( + name='my-request-list', + requests=[ + 'https://apify.com/', + 'https://crawlee.dev/', + 'https://crawlee.dev/python/', + ], + # Enable persistence + persist_state_key='my-persist-state', + persist_requests_key='my-persist-requests', + ) + + # We receive only one request. + # Each time you run it, it will be a new request until you exhaust the `RequestList`. + request = await request_list.fetch_next_request() + if request: + logger.info(f'Processing request: {request.url}') + # Do something with it... + + # And mark it as handled. + await request_list.mark_request_as_handled(request) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/docs/guides/code_examples/request_loaders/sitemap_basic_example.py b/docs/guides/code_examples/request_loaders/sitemap_basic_example.py index 0b367e4710..07beff458f 100644 --- a/docs/guides/code_examples/request_loaders/sitemap_basic_example.py +++ b/docs/guides/code_examples/request_loaders/sitemap_basic_example.py @@ -17,6 +17,7 @@ async def main() -> None: max_buffer_size=500, # Keep up to 500 URLs in memory before processing. ) + # We work with the loader until we process all relevant links from the sitemap. while request := await sitemap_loader.fetch_next_request(): # Do something with it... print(f'Processing {request.url}') diff --git a/docs/guides/code_examples/request_loaders/sitemap_example_with_persist.py b/docs/guides/code_examples/request_loaders/sitemap_example_with_persist.py new file mode 100644 index 0000000000..3b7f3b456d --- /dev/null +++ b/docs/guides/code_examples/request_loaders/sitemap_example_with_persist.py @@ -0,0 +1,45 @@ +import asyncio +import logging + +from crawlee import service_locator +from crawlee.http_clients import ImpitHttpClient +from crawlee.request_loaders import SitemapRequestLoader + +logging.basicConfig(level=logging.INFO, format='%(asctime)s-%(levelname)s-%(message)s') +logger = logging.getLogger(__name__) + + +# Disable clearing the `KeyValueStore` on each run. +# This is necessary so that the state keys are not cleared at startup. +# The recommended way to achieve this behavior is setting the environment variable +# `CRAWLEE_PURGE_ON_START=0` +configuration = service_locator.get_configuration() +configuration.purge_on_start = False + + +async def main() -> None: + # Create an HTTP client for fetching sitemaps + # Use the context manager for `SitemapRequestLoader` to correctly save the state when + # the work is completed. + async with ( + ImpitHttpClient() as http_client, + SitemapRequestLoader( + sitemap_urls=['https://crawlee.dev/sitemap.xml'], + http_client=http_client, + # Enable persistence + persist_state_key='my-persist-state', + ) as sitemap_loader, + ): + # We receive only one request. + # Each time you run it, it will be a new request until you exhaust the sitemap. + request = await sitemap_loader.fetch_next_request() + if request: + logger.info(f'Processing request: {request.url}') + # Do something with it... + + # And mark it as handled. + await sitemap_loader.mark_request_as_handled(request) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/docs/guides/request_loaders.mdx b/docs/guides/request_loaders.mdx index e3a19be46c..bfef65a411 100644 --- a/docs/guides/request_loaders.mdx +++ b/docs/guides/request_loaders.mdx @@ -15,6 +15,8 @@ import RlTandemExample from '!!raw-loader!roa-loader!./code_examples/request_loa import RlExplicitTandemExample from '!!raw-loader!roa-loader!./code_examples/request_loaders/rl_tandem_example_explicit.py'; import SitemapTandemExample from '!!raw-loader!roa-loader!./code_examples/request_loaders/sitemap_tandem_example.py'; import SitemapExplicitTandemExample from '!!raw-loader!roa-loader!./code_examples/request_loaders/sitemap_tandem_example_explicit.py'; +import RlBasicPersistExample from '!!raw-loader!roa-loader!./code_examples/request_loaders/rl_basic_example_with_persist.py'; +import SitemapPersistExample from '!!raw-loader!roa-loader!./code_examples/request_loaders/sitemap_example_with_persist.py'; The [`request_loaders`](https://github.com/apify/crawlee-python/tree/master/src/crawlee/request_loaders) sub-package extends the functionality of the `RequestQueue`, providing additional tools for managing URLs and requests. If you are new to Crawlee and unfamiliar with the `RequestQueue`, consider starting with the [Storages](https://crawlee.dev/python/docs/guides/storages) guide first. Request loaders define how requests are fetched and stored, enabling various use cases such as reading URLs from files, external APIs, or combining multiple sources together. @@ -116,6 +118,16 @@ Here is a basic example of working with the `Req {RlBasicExample} +### Request list with persistence + +The `RequestList` supports state persistence, allowing it to resume from where it left off after interruption. This is particularly useful for long-running crawls or when you need to pause and resume crawling later. + +To enable persistence, provide `persist_state_key` and optionally `persist_requests_key` parameters, and disable automatic cleanup by setting `purge_on_start = False` in the configuration. The `persist_state_key` saves the loader's progress, while `persist_requests_key` ensures that the request data doesn't change between runs. For more details on resuming interrupted crawls, see the [Resuming a paused crawl](../examples/resuming-paused-crawl) example. + + + {RlBasicPersistExample} + + ### Sitemap request loader The `SitemapRequestLoader` is a specialized request loader that reads URLs from XML sitemaps. It's particularly useful when you want to crawl a website systematically by following its sitemap structure. The loader supports filtering URLs using glob patterns and regular expressions, allowing you to include or exclude specific types of URLs. The `SitemapRequestLoader` provides streaming processing of sitemaps, ensuring efficient memory usage without loading the entire sitemap into memory. @@ -124,6 +136,16 @@ The `SitemapRequestLoader` is {SitemapExample} +### Sitemap request loader with persistence + +Similarly, the `SitemapRequestLoader` supports state persistence to resume processing from where it left off. This is especially valuable when processing large sitemaps that may take considerable time to complete. + + + {SitemapPersistExample} + + +When using persistence with `SitemapRequestLoader`, make sure to use the context manager (`async with`) to properly save the state when the work is completed. + ## Request managers The `RequestManager` extends `RequestLoader` with write capabilities. In addition to reading requests, a request manager can add and reclaim them. This is essential for dynamic crawling projects where new URLs may emerge during the crawl process, or when certain requests fail and need to be retried. For more details, refer to the `RequestManager` API reference. diff --git a/src/crawlee/_utils/sitemap.py b/src/crawlee/_utils/sitemap.py index 6a5b85d710..c6ee552b74 100644 --- a/src/crawlee/_utils/sitemap.py +++ b/src/crawlee/_utils/sitemap.py @@ -9,6 +9,7 @@ from hashlib import sha256 from logging import getLogger from typing import TYPE_CHECKING, Literal, TypedDict +from xml.sax import SAXParseException from xml.sax.expatreader import ExpatParser from xml.sax.handler import ContentHandler @@ -192,7 +193,8 @@ async def flush(self) -> AsyncGenerator[_SitemapItem, None]: def close(self) -> None: """Clean up resources.""" - self._parser.close() + with suppress(SAXParseException): + self._parser.close() def _get_parser(content_type: str = '', url: str | None = None) -> _XmlSitemapParser | _TxtSitemapParser: diff --git a/src/crawlee/request_loaders/_sitemap_request_loader.py b/src/crawlee/request_loaders/_sitemap_request_loader.py index def45c7cf7..441f2886d0 100644 --- a/src/crawlee/request_loaders/_sitemap_request_loader.py +++ b/src/crawlee/request_loaders/_sitemap_request_loader.py @@ -1,19 +1,25 @@ from __future__ import annotations import asyncio +from collections import deque from contextlib import suppress from logging import getLogger -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Annotated, Any + +from pydantic import BaseModel, ConfigDict, Field +from typing_extensions import override from crawlee import Request from crawlee._utils.docs import docs_group from crawlee._utils.globs import Glob -from crawlee._utils.sitemap import ParseSitemapOptions, SitemapSource, SitemapUrl, parse_sitemap +from crawlee._utils.recoverable_state import RecoverableState +from crawlee._utils.sitemap import NestedSitemap, ParseSitemapOptions, SitemapSource, SitemapUrl, parse_sitemap from crawlee.request_loaders._request_loader import RequestLoader if TYPE_CHECKING: import re from collections.abc import Sequence + from types import TracebackType from crawlee.http_clients import HttpClient from crawlee.proxy_configuration import ProxyInfo @@ -23,12 +29,72 @@ logger = getLogger(__name__) +class SitemapRequestLoaderState(BaseModel): + """State model for persisting sitemap request loader data. + + The crawler processes one sitemap at a time. The current sitemap is stored in `in_progress_sitemap_url`. + The `parse_sitemap` function parses the sitemap and returns elements as an async iterator. Each element retrieved + from the iterator is processed based on its type. If the element is a `NestedSitemap`, its URL is added to + `pending_sitemap_urls` if it hasn't been processed yet (not in `processed_sitemap_urls`). If the element is a + `SitemapUrl`, the system checks whether it already exists in `current_sitemap_processed_urls`. If it exists, + the loader was restarted from a saved state and the URL is skipped. + + If the URL is new, it is first added to `url_queue`, then to `current_sitemap_processed_urls`, and `total_count` is + incremented by 1. When all elements from the current sitemap iterator have been processed, `in_progress_sitemap_url` + is set to `None`, the sitemap URL is added to `processed_sitemap_urls`, and `current_sitemap_processed_urls` is + cleared. The next sitemap is retrieved from `pending_sitemap_urls`, skipping any URLs that already exist in + `processed_sitemap_urls`. If `pending_sitemap_urls` is empty, `completed` is set to `True`. + + When `fetch_next_request` is called, a URL is extracted from `url_queue` and placed in `in_progress`. + When `mark_request_as_handled` is called for the extracted URL, it is removed from `in_progress` and + `handled_count` is incremented by 1. + + During initial startup or restart after persistence, state validation occurs in `_get_state`. If both + `pending_sitemap_urls` and `in_progress_sitemap_url` are empty and `completed` is False, this indicates a + fresh start. In this case, `self._sitemap_urls` are moved to `pending_sitemap_urls`. Otherwise, the system is + restarting from a persisted state. If `in_progress` contains any URLs, they are moved back to `url_queue` and + `in_progress` is cleared. + """ + + model_config = ConfigDict(populate_by_name=True) + + url_queue: Annotated[deque[str], Field(alias='urlQueue')] + """Queue of URLs extracted from sitemaps and ready for processing.""" + + in_progress: Annotated[set[str], Field(alias='inProgress')] = set() + """Set of request URLs currently being processed.""" + + pending_sitemap_urls: Annotated[deque[str], Field(alias='pendingSitemapUrls')] + """Queue of sitemap URLs that need to be fetched and processed.""" + + in_progress_sitemap_url: Annotated[str | None, Field(alias='inProgressSitemapUrl')] = None + """The sitemap URL currently being processed.""" + + current_sitemap_processed_urls: Annotated[set[str], Field(alias='currentSitemapProcessedUrls')] = set() + """URLs from the current sitemap that have been added to the queue.""" + + processed_sitemap_urls: Annotated[set[str], Field(alias='processedSitemapUrls')] = set() + """Set of processed sitemap URLs.""" + + completed: Annotated[bool, Field(alias='sitemapCompleted')] = False + """Whether all sitemaps have been fully processed.""" + + total_count: Annotated[int, Field(alias='totalCount')] = 0 + """Total number of URLs found and added to the queue from all processed sitemaps.""" + + handled_count: Annotated[int, Field(alias='handledCount')] = 0 + """Number of URLs that have been successfully handled.""" + + @docs_group('Request loaders') class SitemapRequestLoader(RequestLoader): """A request loader that reads URLs from sitemap(s). The loader fetches and parses sitemaps in the background, allowing crawling to start before all URLs are loaded. It supports filtering URLs using glob and regex patterns. + + The loader supports state persistence, allowing it to resume from where it left off + after interruption when a `persist_state_key` is provided during initialization. """ def __init__( @@ -40,7 +106,7 @@ def __init__( include: list[re.Pattern[Any] | Glob] | None = None, exclude: list[re.Pattern[Any] | Glob] | None = None, max_buffer_size: int = 200, - parse_sitemap_options: ParseSitemapOptions | None = None, + persist_state_key: str | None = None, ) -> None: """Initialize the sitemap request loader. @@ -50,27 +116,64 @@ def __init__( include: List of glob or regex patterns to include URLs. exclude: List of glob or regex patterns to exclude URLs. max_buffer_size: Maximum number of URLs to buffer in memory. - parse_sitemap_options: Options for parsing sitemaps, such as `SitemapSource` and `max_urls`. http_client: the instance of `HttpClient` to use for fetching sitemaps. + persist_state_key: A key for persisting the loader's state in the KeyValueStore. + When provided, allows resuming from where it left off after interruption. + If None, no state persistence occurs. """ self._http_client = http_client - self._sitemap_urls = sitemap_urls self._include = include self._exclude = exclude self._proxy_info = proxy_info - self._parse_sitemap_options = parse_sitemap_options or ParseSitemapOptions() + self._max_buffer_size = max_buffer_size + + # Synchronization for queue operations + self._queue_has_capacity = asyncio.Event() + self._queue_has_capacity.set() + self._queue_lock = asyncio.Lock() + + # Initialize recoverable state + self._state = RecoverableState( + default_state=SitemapRequestLoaderState( + url_queue=deque(), + pending_sitemap_urls=deque(), + ), + persistence_enabled=bool(persist_state_key), + persist_state_key=persist_state_key or '', + logger=logger, + ) + + # Start background loading + self._loading_task = asyncio.create_task(self._load_sitemaps()) - self._handled_count = 0 - self._total_count = 0 + async def _get_state(self) -> SitemapRequestLoaderState: + """Initialize and return the current state.""" + async with self._queue_lock: + if self._state.is_initialized: + return self._state.current_value - # URL queue and tracking - self._url_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=max_buffer_size) - self._in_progress: set[str] = set() - self._processed_urls: set[str] = set() + await self._state.initialize() - # Loading state - self._loading_task = asyncio.create_task(self._load_sitemaps()) + # Initialize pending sitemaps on first run + has_sitemap_for_processing = ( + self._state.current_value.pending_sitemap_urls or self._state.current_value.in_progress_sitemap_url + ) + if not has_sitemap_for_processing and not self._state.current_value.completed: + self._state.current_value.pending_sitemap_urls.extend(self._sitemap_urls) + + if self._state.current_value.in_progress: + self._state.current_value.url_queue.extendleft(self._state.current_value.in_progress) + self._state.current_value.in_progress.clear() + + if ( + self._state.current_value.url_queue + and len(self._state.current_value.url_queue) >= self._max_buffer_size + ): + # Notify that the queue is full + self._queue_has_capacity.clear() + + return self._state.current_value def _check_url_patterns( self, @@ -105,73 +208,150 @@ def _check_url_patterns( async def _load_sitemaps(self) -> None: """Load URLs from sitemaps in the background.""" try: - async for item in parse_sitemap( - [SitemapSource(type='url', url=url) for url in self._sitemap_urls], - self._http_client, - proxy_info=self._proxy_info, - options=self._parse_sitemap_options, - ): - # Only process URL items (not nested sitemaps) - if isinstance(item, SitemapUrl): - url = item.loc - - # Skip if already processed - if url in self._processed_urls: + # Get actual state + while (state := await self._get_state()) and (state.pending_sitemap_urls or state.in_progress_sitemap_url): + # Get sitemap URL for parsing + sitemap_url = state.in_progress_sitemap_url + if not sitemap_url: + sitemap_url = state.pending_sitemap_urls.popleft() + # Skip processed urls + if sitemap_url in state.processed_sitemap_urls: continue - - # Check if URL should be included - if not self._check_url_patterns(url, self._include, self._exclude): + state.in_progress_sitemap_url = sitemap_url + + parse_options = ParseSitemapOptions(max_depth=0, emit_nested_sitemaps=True) + + async for item in parse_sitemap( + [SitemapSource(type='url', url=sitemap_url)], + self._http_client, + proxy_info=self._proxy_info, + options=parse_options, + ): + if isinstance(item, NestedSitemap): + # Add nested sitemap to queue + if item.loc not in state.pending_sitemap_urls and item.loc not in state.processed_sitemap_urls: + state.pending_sitemap_urls.append(item.loc) continue - await self._url_queue.put(url) - self._processed_urls.add(url) - self._total_count += 1 + if isinstance(item, SitemapUrl): + url = item.loc + + state = await self._get_state() + + # Skip if already processed + if url in state.current_sitemap_processed_urls: + continue + + # Check if URL should be included + if not self._check_url_patterns(url, self._include, self._exclude): + continue + + # Check if we have capacity in the queue + await self._queue_has_capacity.wait() + + state = await self._get_state() + async with self._queue_lock: + state.url_queue.append(url) + state.current_sitemap_processed_urls.add(url) + state.total_count += 1 + if len(state.url_queue) >= self._max_buffer_size: + # Notify that the queue is full + self._queue_has_capacity.clear() + + # Clear current sitemap after processing + state = await self._get_state() + current_sitemap_url = state.in_progress_sitemap_url + state.in_progress_sitemap_url = None + if current_sitemap_url: + state.processed_sitemap_urls.add(current_sitemap_url) + state.current_sitemap_processed_urls.clear() + + # Mark as completed after processing all sitemap urls + state.completed = True except Exception: logger.exception('Error loading sitemaps') raise + @override async def get_total_count(self) -> int: """Return the total number of URLs found so far.""" - return self._total_count + state = await self._get_state() + return state.total_count + @override + async def get_handled_count(self) -> int: + """Return the number of URLs that have been handled.""" + state = await self._get_state() + return state.handled_count + + @override async def is_empty(self) -> bool: """Check if there are no more URLs to process.""" - return self._url_queue.empty() and self._loading_task.done() + state = await self._get_state() + return not state.url_queue + @override async def is_finished(self) -> bool: """Check if all URLs have been processed.""" - return self._url_queue.empty() and len(self._in_progress) == 0 and self._loading_task.done() + state = await self._get_state() + return not state.url_queue and len(state.in_progress) == 0 and self._loading_task.done() + @override async def fetch_next_request(self) -> Request | None: """Fetch the next request to process.""" - while not (self._loading_task.done() and self._url_queue.empty()): - if self._url_queue.empty(): - await asyncio.sleep(0.5) + while not (await self.is_finished()): + state = await self._get_state() + if not state.url_queue: + await asyncio.sleep(0.1) continue - url = await self._url_queue.get() + async with self._queue_lock: + url = state.url_queue.popleft() + + request = Request.from_url(url) + state.in_progress.add(request.url) + if len(state.url_queue) < self._max_buffer_size: + self._queue_has_capacity.set() - request = Request.from_url(url) - self._in_progress.add(request.unique_key) return request return None + @override async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: """Mark a request as successfully handled.""" - if request.unique_key in self._in_progress: - self._in_progress.remove(request.unique_key) - self._handled_count += 1 + state = await self._get_state() + if request.url in state.in_progress: + state.in_progress.remove(request.url) + state.handled_count += 1 return None - async def get_handled_count(self) -> int: - """Return the number of handled requests.""" - return self._handled_count - async def abort_loading(self) -> None: """Abort the sitemap loading process.""" if self._loading_task and not self._loading_task.done(): self._loading_task.cancel() with suppress(asyncio.CancelledError): await self._loading_task + + async def start(self) -> None: + """Start the sitemap loading process.""" + if self._loading_task and not self._loading_task.done(): + return + self._loading_task = asyncio.create_task(self._load_sitemaps()) + + async def close(self) -> None: + """Close the request loader.""" + await self.abort_loading() + await self._state.teardown() + + async def __aenter__(self) -> SitemapRequestLoader: + """Enter the context manager.""" + await self.start() + return self + + async def __aexit__( + self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_traceback: TracebackType | None + ) -> None: + """Exit the context manager.""" + await self.close() diff --git a/tests/unit/request_loaders/test_sitemap_request_loader.py b/tests/unit/request_loaders/test_sitemap_request_loader.py index 6e73708cb2..b70f63144c 100644 --- a/tests/unit/request_loaders/test_sitemap_request_loader.py +++ b/tests/unit/request_loaders/test_sitemap_request_loader.py @@ -1,3 +1,4 @@ +import asyncio import base64 import gzip @@ -5,6 +6,7 @@ from crawlee.http_clients._base import HttpClient from crawlee.request_loaders._sitemap_request_loader import SitemapRequestLoader +from crawlee.storages import KeyValueStore BASIC_SITEMAP = """ @@ -103,3 +105,70 @@ async def test_abort_sitemap_loading(server_url: URL, http_client: HttpClient) - await sitemap_loader.mark_request_as_handled(item) assert await sitemap_loader.is_finished() + + +async def test_create_persist_state_for_sitemap_loading( + server_url: URL, http_client: HttpClient, key_value_store: KeyValueStore +) -> None: + sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode())) + persist_key = 'create_persist_state' + sitemap_loader = SitemapRequestLoader([str(sitemap_url)], http_client=http_client, persist_state_key=persist_key) + assert await sitemap_loader.is_finished() is False + + await sitemap_loader.close() + + state_data = await key_value_store.get_value(persist_key) + + assert state_data is not None + assert state_data['handledCount'] == 0 + + +async def test_data_persistence_for_sitemap_loading( + server_url: URL, http_client: HttpClient, key_value_store: KeyValueStore +) -> None: + async def wait_for_sitemap_loader_not_empty(sitemap_loader: SitemapRequestLoader) -> None: + while await sitemap_loader.is_empty() and not await sitemap_loader.is_finished(): # noqa: ASYNC110 + await asyncio.sleep(0.1) + + sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode())) + persist_key = 'data_persist_state' + sitemap_loader = SitemapRequestLoader([str(sitemap_url)], http_client=http_client, persist_state_key=persist_key) + + # Give time to load + await asyncio.wait_for(wait_for_sitemap_loader_not_empty(sitemap_loader), timeout=2) + + await sitemap_loader.close() + + state_data = await key_value_store.get_value(persist_key) + + assert state_data is not None + assert state_data['handledCount'] == 0 + assert state_data['totalCount'] == 5 + assert len(state_data['urlQueue']) == 5 + + +async def test_recovery_data_persistence_for_sitemap_loading( + server_url: URL, http_client: HttpClient, key_value_store: KeyValueStore +) -> None: + sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode())) + persist_key = 'recovery_persist_state' + sitemap_loader = SitemapRequestLoader([str(sitemap_url)], http_client=http_client, persist_state_key=persist_key) + + item = await sitemap_loader.fetch_next_request() + + assert item is not None + await sitemap_loader.mark_request_as_handled(item) + + await sitemap_loader.close() + + state_data = await key_value_store.get_value(persist_key) + + assert state_data is not None + next_item_in_kvs = state_data['urlQueue'][0] + + sitemap_loader = SitemapRequestLoader([str(sitemap_url)], http_client=http_client, persist_state_key=persist_key) + + item = await sitemap_loader.fetch_next_request() + + assert item is not None + assert item.url == next_item_in_kvs