Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async def main() -> None:
max_buffer_size=500, # Buffer up to 500 URLs in memory
)

await asyncio.sleep(0.1) # Allow some time for the loader to start
while request := await sitemap_loader.fetch_next_request():
# Do something with it...

Expand Down
4 changes: 3 additions & 1 deletion src/crawlee/_utils/sitemap.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from hashlib import sha256
from logging import getLogger
from typing import TYPE_CHECKING, Literal, TypedDict
from xml.sax import SAXParseException
from xml.sax.expatreader import ExpatParser
from xml.sax.handler import ContentHandler

Expand Down Expand Up @@ -192,7 +193,8 @@ async def flush(self) -> AsyncGenerator[_SitemapItem, None]:

def close(self) -> None:
"""Clean up resources."""
self._parser.close()
with suppress(SAXParseException):
self._parser.close()


def _get_parser(content_type: str = '', url: str | None = None) -> _XmlSitemapParser | _TxtSitemapParser:
Expand Down
229 changes: 179 additions & 50 deletions src/crawlee/request_loaders/_sitemap_request_loader.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
from __future__ import annotations

import asyncio
from collections import deque
from contextlib import suppress
from logging import getLogger
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Annotated, Any

from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import override

from crawlee import Request
from crawlee._utils.docs import docs_group
from crawlee._utils.globs import Glob
from crawlee._utils.sitemap import ParseSitemapOptions, SitemapSource, SitemapUrl, parse_sitemap
from crawlee._utils.recoverable_state import RecoverableState
from crawlee._utils.sitemap import NestedSitemap, ParseSitemapOptions, SitemapSource, SitemapUrl, parse_sitemap
from crawlee.request_loaders._request_loader import RequestLoader

if TYPE_CHECKING:
import re
from collections.abc import Sequence
from types import TracebackType

from crawlee.http_clients import HttpClient
from crawlee.proxy_configuration import ProxyInfo
Expand All @@ -23,6 +29,36 @@
logger = getLogger(__name__)


class SitemapRequestLoaderState(BaseModel):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I see this from the outside, it would be great if you could write down how the persistence mechanism works in the docblock of this class.

Also, I don't see processed sitemap URLs being tracked in a any way, is that intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see processed sitemap URLs being tracked in a any way, is that intentional?

Yes, I may be wrong, but I think that cyclic links are not expected in sitemaps. Thanks to this, we don't need to store links to processed sitemaps.

JS uses similar behavior - https://github.com/apify/crawlee/blob/master/packages/core/src/storages/sitemap_request_list.ts#L108

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't be surprised to encounter a cyclic sitemap somewhere, but I don't have a real-world example 🤷

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cyclic sitemaps aside, can you please briefly describe how is this state model used by the loader? Like "The crawler processes one sitemap at a time. The current one is kept in in_progress_sitemap_url..."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. 🙂

The crawler processes one sitemap at a time. The current sitemap is stored in in_progress_sitemap_url. The parse_sitemap function parses the sitemap and returns elements as an async iterator. Each element retrieved from the iterator is processed based on its type. If the element is a NestedSitemap, its URL is added to pending_sitemap_urls. If the element is a SitemapUrl, the system checks whether it already exists in current_sitemap_processed_urls. If it exists, the loader was restarted from a saved state and the URL is skipped.

If the URL is new, it is first added to url_queue, then to current_sitemap_processed_urls, and total_count is incremented by 1. When all elements from the current sitemap iterator have been processed, in_progress_sitemap_url is set to None and current_sitemap_processed_urls is cleared. The next sitemap is retrieved from pending_sitemap_urls. If pending_sitemap_urls is empty, completed is set to True.

When fetch_next_request is called, a URL is extracted from url_queue and placed in in_progress. When mark_request_as_handled is called for the extracted URL, it is removed from in_progress and handled_count is incremented by 1.

During initial startup or restart after persistence, state validation occurs in _get_state. If both pending_sitemap_urls and in_progress_sitemap_url are empty and completed is False, this indicates a fresh start. In this case, self._sitemap_urls are moved to pending_sitemap_urls. Otherwise, the system is restarting from a persisted state. If in_progress contains any URLs, they are moved back to url_queue and in_progress is cleared.

Copy link
Collaborator Author

@Mantisus Mantisus Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding cyclic sitemaps. According to sitemaps.org, only a sitemapindex should contain links to sitemaps, and a sitemapindex should not contain links to other sitemapindex files.
This means the hierarchy should be: a sitemapindex contains links to one or more sitemaps, where each sitemap contains one or more links to website pages.
If a website follows the recommended protocol, this should prevent circular sitemaps. However, a website might have multiple sitemapindex files that reference the same sitemap (I don't think search engines would like this, but it's possible).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. 🙂 The crawler processes one sitemap at a time. The current sitemap is stored in...

Cool. Now put it inside the docblock please 😁

However, a website might have multiple sitemapindex files that reference the same sitemap (I don't think search engines would like this, but it's possible).

I could imagine a website implementing something like this as a scraper tarpit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Now put it inside the docblock please 😁

Added 🙂

I could imagine a website implementing something like this as a scraper tarpit.

I added processed_sitemap_urls so that we can ensure we don't reprocess the same sitemap. 🙂

"""State model for persisting sitemap request loader data."""

model_config = ConfigDict(populate_by_name=True)

url_queue: Annotated[deque[str], Field(alias='urlQueue')]
"""Queue of URLs extracted from sitemaps and ready for processing."""

in_progress: Annotated[set[str], Field(alias='inProgress')] = set()
"""Set of request IDs currently being processed."""

pending_sitemap_urls: Annotated[deque[str], Field(alias='pendingSitemapUrls')]
"""Queue of sitemap URLs that need to be fetched and processed."""

in_progress_sitemap_url: Annotated[str | None, Field(alias='inProgressSitemapUrl')] = None
"""The sitemap URL currently being processed."""

current_sitemap_processed_urls: Annotated[set[str], Field(alias='currentSitemapProcessedUrls')] = set()
"""URLs from the current sitemap that have been added to the queue."""

completed: Annotated[bool, Field(alias='sitemapCompleted')] = False
"""Whether all sitemaps have been fully processed."""

total_count: Annotated[int, Field(alias='totalCount')] = 0
"""Total number of URLs found and added to the queue from all processed sitemaps."""

handled_count: Annotated[int, Field(alias='handledCount')] = 0
"""Number of URLs that have been successfully handled."""


@docs_group('Request loaders')
class SitemapRequestLoader(RequestLoader):
"""A request loader that reads URLs from sitemap(s).
Expand All @@ -40,7 +76,7 @@ def __init__(
include: list[re.Pattern[Any] | Glob] | None = None,
exclude: list[re.Pattern[Any] | Glob] | None = None,
max_buffer_size: int = 200,
parse_sitemap_options: ParseSitemapOptions | None = None,
persist_state_key: str | None = None,
) -> None:
"""Initialize the sitemap request loader.

Expand All @@ -50,27 +86,53 @@ def __init__(
include: List of glob or regex patterns to include URLs.
exclude: List of glob or regex patterns to exclude URLs.
max_buffer_size: Maximum number of URLs to buffer in memory.
parse_sitemap_options: Options for parsing sitemaps, such as `SitemapSource` and `max_urls`.
http_client: the instance of `HttpClient` to use for fetching sitemaps.
persist_state_key: A key for persisting the loader's state in the KeyValueStore.
When provided, allows resuming from where it left off after interruption.
If None, no state persistence occurs.
"""
self._http_client = http_client

self._sitemap_urls = sitemap_urls
self._include = include
self._exclude = exclude
self._proxy_info = proxy_info
self._parse_sitemap_options = parse_sitemap_options or ParseSitemapOptions()
self._max_buffer_size = max_buffer_size

# Synchronization for queue operations
self._queue_has_capacity = asyncio.Event()
self._queue_has_capacity.set()
self._queue_lock = asyncio.Lock()

# Initialize recoverable state
self._state = RecoverableState(
default_state=SitemapRequestLoaderState(
url_queue=deque(),
pending_sitemap_urls=deque(),
),
persistence_enabled=bool(persist_state_key),
persist_state_key=persist_state_key or '',
logger=logger,
)

# Start background loading
self._loading_task = asyncio.create_task(self._load_sitemaps())

self._handled_count = 0
self._total_count = 0
async def _get_state(self) -> SitemapRequestLoaderState:
"""Initialize and return the current state."""
if self._state.is_initialized:
return self._state.current_value

# URL queue and tracking
self._url_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=max_buffer_size)
self._in_progress: set[str] = set()
self._processed_urls: set[str] = set()
await self._state.initialize()

# Loading state
self._loading_task = asyncio.create_task(self._load_sitemaps())
# Initialize pending sitemaps on first run
if not self._state.current_value.pending_sitemap_urls and not self._state.current_value.completed:
self._state.current_value.pending_sitemap_urls.extend(self._sitemap_urls)

if self._state.current_value.url_queue and len(self._state.current_value.url_queue) >= self._max_buffer_size:
# Notify that the queue is full
self._queue_has_capacity.clear()

return self._state.current_value

def _check_url_patterns(
self,
Expand Down Expand Up @@ -105,73 +167,140 @@ def _check_url_patterns(
async def _load_sitemaps(self) -> None:
"""Load URLs from sitemaps in the background."""
try:
async for item in parse_sitemap(
[SitemapSource(type='url', url=url) for url in self._sitemap_urls],
self._http_client,
proxy_info=self._proxy_info,
options=self._parse_sitemap_options,
):
# Only process URL items (not nested sitemaps)
if isinstance(item, SitemapUrl):
url = item.loc

# Skip if already processed
if url in self._processed_urls:
# Get actual state
while (state := await self._get_state()) and (state.pending_sitemap_urls or state.in_progress_sitemap_url):
# Get sitemap URL for parsing
sitemap_url = state.in_progress_sitemap_url
if not sitemap_url:
sitemap_url = state.pending_sitemap_urls.popleft()
state.in_progress_sitemap_url = sitemap_url

parse_options = ParseSitemapOptions(max_depth=0, emit_nested_sitemaps=True)

async for item in parse_sitemap(
[SitemapSource(type='url', url=sitemap_url)],
self._http_client,
proxy_info=self._proxy_info,
options=parse_options,
):
if isinstance(item, NestedSitemap):
# Add nested sitemap to queue
if item.loc not in state.pending_sitemap_urls:
state.pending_sitemap_urls.append(item.loc)
continue

# Check if URL should be included
if not self._check_url_patterns(url, self._include, self._exclude):
continue
if isinstance(item, SitemapUrl):
url = item.loc

await self._url_queue.put(url)
self._processed_urls.add(url)
self._total_count += 1
state = await self._get_state()

# Skip if already processed
if url in state.current_sitemap_processed_urls:
continue

# Check if URL should be included
if not self._check_url_patterns(url, self._include, self._exclude):
continue

# Check if we have capacity in the queue
await self._queue_has_capacity.wait()

async with self._queue_lock:
state = await self._get_state()
state.url_queue.append(url)
state.current_sitemap_processed_urls.add(url)
state.total_count += 1
if len(state.url_queue) >= self._max_buffer_size:
# Notify that the queue is full
self._queue_has_capacity.clear()

# Clear current sitemap after processing
state = await self._get_state()
state.in_progress_sitemap_url = None
state.current_sitemap_processed_urls.clear()

# Mark as completed after processing all sitemap urls
state.completed = True

except Exception:
logger.exception('Error loading sitemaps')
raise

@override
async def get_total_count(self) -> int:
"""Return the total number of URLs found so far."""
return self._total_count
state = await self._get_state()
return state.total_count

@override
async def get_handled_count(self) -> int:
"""Return the number of URLs that have been handled."""
state = await self._get_state()
return state.handled_count

@override
async def is_empty(self) -> bool:
"""Check if there are no more URLs to process."""
return self._url_queue.empty() and self._loading_task.done()
state = await self._get_state()
return not state.url_queue

@override
async def is_finished(self) -> bool:
"""Check if all URLs have been processed."""
return self._url_queue.empty() and len(self._in_progress) == 0 and self._loading_task.done()
state = await self._get_state()
return not state.url_queue and len(state.in_progress) == 0 and self._loading_task.done()

@override
async def fetch_next_request(self) -> Request | None:
"""Fetch the next request to process."""
while not (self._loading_task.done() and self._url_queue.empty()):
if self._url_queue.empty():
await asyncio.sleep(0.5)
continue
state = await self._get_state()
if not state.url_queue:
return None

url = await self._url_queue.get()
async with self._queue_lock:
url = state.url_queue.popleft()

request = Request.from_url(url)
self._in_progress.add(request.id)
return request
state.in_progress.add(request.id)
if len(state.url_queue) < self._max_buffer_size:
self._queue_has_capacity.set()

return None
return request

@override
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
"""Mark a request as successfully handled."""
if request.id in self._in_progress:
self._in_progress.remove(request.id)
self._handled_count += 1
state = await self._get_state()
if request.id in state.in_progress:
state.in_progress.remove(request.id)
state.handled_count += 1
return None

async def get_handled_count(self) -> int:
"""Return the number of handled requests."""
return self._handled_count

async def abort_loading(self) -> None:
"""Abort the sitemap loading process."""
if self._loading_task and not self._loading_task.done():
self._loading_task.cancel()
with suppress(asyncio.CancelledError):
await self._loading_task

async def start(self) -> None:
"""Start the sitemap loading process."""
if self._loading_task and not self._loading_task.done():
return
self._loading_task = asyncio.create_task(self._load_sitemaps())

async def close(self) -> None:
"""Close the request loader."""
await self.abort_loading()
await self._state.teardown()

async def __aenter__(self) -> SitemapRequestLoader:
"""Enter the context manager."""
await self.start()
return self

async def __aexit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_traceback: TracebackType | None
) -> None:
"""Exit the context manager."""
await self.close()
Loading
Loading