Skip to content
Merged
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ keywords = [
"scraping",
]
dependencies = [
"async-timeout>=5.0.1",
"cachetools>=5.5.0",
"colorama>=0.4.0",
"impit>=0.8.0",
Expand Down
39 changes: 38 additions & 1 deletion src/crawlee/_utils/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import time
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import timedelta
from typing import TYPE_CHECKING

from async_timeout import Timeout, timeout

if TYPE_CHECKING:
from collections.abc import Iterator
from datetime import timedelta
from types import TracebackType

_SECONDS_PER_MINUTE = 60
_SECONDS_PER_HOUR = 3600
Expand Down Expand Up @@ -35,6 +38,40 @@ def measure_time() -> Iterator[TimerResult]:
result.cpu = after_cpu - before_cpu


class SharedTimeout:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea. I like this a lot.

Just thinking out loud, where this could lead to:
In case we need to create more granular timeouts for specific steps, I think this class could be easily expanded to support that. I am wondering if even the final context consumer (request handler) could just use timeout from here if the timeout is set on the context (my other comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow, do you think that the request handler should be limited by a shared timeout? Or that it should have access to the remaining timeout "budget"?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not focus on any specific interface example in my example. It is just about capability.

Imagine that the context would be created something like this:

pipeline_timeout = SharedTimeout(...)

BasicCrawlingContext(.....,
 timeouts={
"WholePipeline": pipeline_timeout,   # Maybe the other timeouts could be somehow limited by this one? 
"Navigation": pipeline_timeout.limited_to(NAVIGATION_LIMIT),
"RequestHandler": pipeline_timeout.limited_to(HANDLER_LIMIT)
})

And each timeout-protected context consumer would pick the relevant timeout from the context and apply it. Context consumers could even modify the timeouts of the steps that follow them.

For example, users could mutate "RequestHandler" timeout in pre-navigation hooks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to block this change for this. If needed, we can discuss here: #1596

"""Keeps track of a time budget shared by multiple independent async operations."""

def __init__(self, timeout: timedelta) -> None:
self._remaining_timeout = timeout
self._active_timeout: Timeout | None = None
self._activation_timestamp: float | None = None

async def __aenter__(self) -> timedelta:
if self._active_timeout is not None or self._activation_timestamp is not None:
raise RuntimeError('A shared timeout context cannot be entered twice at the same time')

self._activation_timestamp = time.monotonic()
self._active_timeout = new_timeout = timeout(self._remaining_timeout.total_seconds())
await new_timeout.__aenter__()
return self._remaining_timeout

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
if self._active_timeout is None or self._activation_timestamp is None:
raise RuntimeError('Logic error')

await self._active_timeout.__aexit__(exc_type, exc_value, exc_traceback)
elapsed = time.monotonic() - self._activation_timestamp
self._remaining_timeout = self._remaining_timeout - timedelta(seconds=elapsed)

self._active_timeout = None
self._activation_timestamp = None


def format_duration(duration: timedelta | None) -> str:
"""Format a timedelta into a human-readable string with appropriate units."""
if duration is None:
Expand Down
27 changes: 18 additions & 9 deletions src/crawlee/crawlers/_abstract_http/_abstract_http_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
from abc import ABC
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Generic
from weakref import WeakKeyDictionary

from more_itertools import partition
from pydantic import ValidationError
from typing_extensions import NotRequired, TypeVar

from crawlee._request import Request, RequestOptions
from crawlee._types import BasicCrawlingContext
from crawlee._utils.docs import docs_group
from crawlee._utils.time import SharedTimeout
from crawlee._utils.urls import to_absolute_url_iterator
from crawlee.crawlers._basic import BasicCrawler, BasicCrawlerOptions, ContextPipeline
from crawlee.errors import SessionError
Expand All @@ -25,7 +28,7 @@
from typing_extensions import Unpack

from crawlee import RequestTransformAction
from crawlee._types import BasicCrawlingContext, EnqueueLinksKwargs, ExtractLinksFunction
from crawlee._types import EnqueueLinksKwargs, ExtractLinksFunction

from ._abstract_http_parser import AbstractHttpParser

Expand Down Expand Up @@ -76,6 +79,7 @@ def __init__(
self._parser = parser
self._navigation_timeout = navigation_timeout or timedelta(minutes=1)
self._pre_navigation_hooks: list[Callable[[BasicCrawlingContext], Awaitable[None]]] = []
self._shared_navigation_timeouts = WeakKeyDictionary[BasicCrawlingContext, SharedTimeout]()

if '_context_pipeline' not in kwargs:
raise ValueError(
Expand Down Expand Up @@ -128,8 +132,12 @@ def _create_static_content_crawler_pipeline(self) -> ContextPipeline[ParsedHttpC
async def _execute_pre_navigation_hooks(
self, context: BasicCrawlingContext
) -> AsyncGenerator[BasicCrawlingContext, None]:
self._shared_navigation_timeouts[context] = SharedTimeout(self._navigation_timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this. Context is expanded by many pipeline steps, so this mapping is not very flexible as it will break if you try to apply SharedTimeout over pipeline steps that work on expanded context.

Maybe it would be better to keep the timeout on the context itself, instead of this mapping on the crawler? I think it belongs there also conceptually. The crawler sets the timeout, but it belongs to the context. Also only the consumers of this specific context should have access to the timeout, there is no reason to keep it global for the whole Crawler.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here 🙂 I want to avoid exposing the timeout to the request handler, mostly because it just doesn't make sense to me to do so.

Since the tests are now failing because of this, I guess we can agree that this approach is not optimal - I'll iterate on it for a while.


for hook in self._pre_navigation_hooks:
await hook(context)
async with self._shared_navigation_timeouts[context]:
await hook(context)

yield context

async def _parse_http_response(
Expand Down Expand Up @@ -232,13 +240,14 @@ async def _make_http_request(self, context: BasicCrawlingContext) -> AsyncGenera
Yields:
The original crawling context enhanced by HTTP response.
"""
result = await self._http_client.crawl(
request=context.request,
session=context.session,
proxy_info=context.proxy_info,
statistics=self._statistics,
timeout=self._navigation_timeout,
)
async with self._shared_navigation_timeouts[context] as remaining_timeout:
result = await self._http_client.crawl(
request=context.request,
session=context.session,
proxy_info=context.proxy_info,
statistics=self._statistics,
timeout=remaining_timeout,
)

yield HttpCrawlingContext.from_basic_crawling_context(context=context, http_response=result.http_response)

Expand Down
23 changes: 17 additions & 6 deletions src/crawlee/crawlers/_playwright/_playwright_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import timedelta
from functools import partial
from typing import TYPE_CHECKING, Any, Generic, Literal
from weakref import WeakKeyDictionary

import playwright.async_api
from more_itertools import partition
Expand All @@ -14,10 +15,14 @@

from crawlee import service_locator
from crawlee._request import Request, RequestOptions
from crawlee._types import ConcurrencySettings
from crawlee._types import (
BasicCrawlingContext,
ConcurrencySettings,
)
from crawlee._utils.blocked import RETRY_CSS_SELECTORS
from crawlee._utils.docs import docs_group
from crawlee._utils.robots import RobotsTxtFile
from crawlee._utils.time import SharedTimeout
from crawlee._utils.urls import to_absolute_url_iterator
from crawlee.browsers import BrowserPool
from crawlee.crawlers._basic import BasicCrawler, BasicCrawlerOptions, ContextPipeline
Expand Down Expand Up @@ -46,7 +51,6 @@

from crawlee import RequestTransformAction
from crawlee._types import (
BasicCrawlingContext,
EnqueueLinksKwargs,
ExtractLinksFunction,
HttpHeaders,
Expand Down Expand Up @@ -145,6 +149,8 @@ def __init__(
if configuration is not None:
service_locator.set_configuration(configuration)

self._shared_navigation_timeouts = WeakKeyDictionary[BasicCrawlingContext, SharedTimeout]()

if browser_pool:
# Raise an exception if browser_pool is provided together with other browser-related arguments.
if any(
Expand Down Expand Up @@ -235,9 +241,13 @@ async def _open_page(
block_requests=partial(block_requests, page=crawlee_page.page),
)

self._shared_navigation_timeouts[pre_navigation_context] = SharedTimeout(self._navigation_timeout)

async with browser_page_context(crawlee_page.page):
for hook in self._pre_navigation_hooks:
await hook(pre_navigation_context)
async with self._shared_navigation_timeouts[context]:
await hook(pre_navigation_context)

yield pre_navigation_context

def _prepare_request_interceptor(
Expand Down Expand Up @@ -306,9 +316,10 @@ async def _navigate(
await context.page.route(context.request.url, route_handler)

try:
response = await context.page.goto(
context.request.url, timeout=self._navigation_timeout.total_seconds() * 1000
)
async with self._shared_navigation_timeouts[context] as remaining_timeout:
response = await context.page.goto(
context.request.url, timeout=remaining_timeout.total_seconds() * 1000
)
except playwright.async_api.TimeoutError as exc:
raise asyncio.TimeoutError from exc

Expand Down
57 changes: 57 additions & 0 deletions tests/unit/_utils/test_shared_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio
from datetime import timedelta

import pytest

from crawlee._utils.time import SharedTimeout, measure_time


async def test_shared_timeout_tracks_elapsed_time() -> None:
timeout_duration = timedelta(seconds=1)
shared_timeout = SharedTimeout(timeout_duration)

# First usage
async with shared_timeout:
await asyncio.sleep(0.2)

# Second usage - should have less time remaining
async with shared_timeout as remaining:
assert remaining < timedelta(seconds=0.85)
assert remaining > timedelta(seconds=0)


async def test_shared_timeout_expires() -> None:
timeout_duration = timedelta(seconds=0.1)
shared_timeout = SharedTimeout(timeout_duration)

with measure_time() as elapsed, pytest.raises(asyncio.TimeoutError):
async with shared_timeout:
await asyncio.sleep(0.5)

assert elapsed.wall is not None
assert elapsed.wall < 0.3


async def test_shared_timeout_cannot_be_nested() -> None:
timeout_duration = timedelta(seconds=1)
shared_timeout = SharedTimeout(timeout_duration)

async with shared_timeout:
with pytest.raises(RuntimeError, match='cannot be entered twice'):
async with shared_timeout:
pass


async def test_shared_timeout_multiple_sequential_uses() -> None:
"""Test that SharedTimeout can be used multiple times sequentially."""
timeout_duration = timedelta(seconds=1)
shared_timeout = SharedTimeout(timeout_duration)

for _ in range(5):
async with shared_timeout:
await asyncio.sleep(0.05)

# Should have consumed roughly 0.25 seconds
async with shared_timeout as remaining:
assert remaining < timedelta(seconds=0.8)
assert remaining > timedelta(seconds=0)
18 changes: 18 additions & 0 deletions tests/unit/crawlers/_beautifulsoup/test_beautifulsoup_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,24 @@ async def test_navigation_timeout_on_slow_request(server_url: URL, http_client:
assert isinstance(failed_request_handler.call_args[0][1], asyncio.TimeoutError)


async def test_navigation_timeout_applies_to_hooks(server_url: URL) -> None:
crawler = BeautifulSoupCrawler(
navigation_timeout=timedelta(seconds=1),
max_request_retries=0,
)

request_handler = mock.AsyncMock()
crawler.router.default_handler(request_handler)
crawler.pre_navigation_hook(lambda _: asyncio.sleep(1))

# Pre-navigation hook takes 1 second (exceeds navigation timeout), so the URL will not be handled
result = await crawler.run([str(server_url)])

assert result.requests_failed == 1
assert result.requests_finished == 0
assert request_handler.call_count == 0


async def test_slow_navigation_does_not_count_toward_handler_timeout(server_url: URL, http_client: HttpClient) -> None:
crawler = BeautifulSoupCrawler(
http_client=http_client,
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/crawlers/_playwright/test_playwright_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,24 @@ async def test_navigation_timeout_on_slow_page_load(server_url: URL) -> None:
assert isinstance(failed_request_handler.call_args[0][1], asyncio.TimeoutError)


async def test_navigation_timeout_applies_to_hooks(server_url: URL) -> None:
crawler = PlaywrightCrawler(
navigation_timeout=timedelta(seconds=0.5),
max_request_retries=0,
)

request_handler = AsyncMock()
crawler.router.default_handler(request_handler)
crawler.pre_navigation_hook(lambda _: asyncio.sleep(1))

# Pre-navigation hook takes 1 second (exceeds navigation timeout), so the URL will not be handled
result = await crawler.run([str(server_url)])

assert result.requests_failed == 1
assert result.requests_finished == 0
assert request_handler.call_count == 0


async def test_slow_navigation_does_not_count_toward_handler_timeout(server_url: URL) -> None:
crawler = PlaywrightCrawler(
request_handler_timeout=timedelta(seconds=0.5),
Expand Down
6 changes: 4 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading