Skip to content

Commit 7dd0f66

Browse files
committed
WIp, fix failing tests
1 parent 3aa0215 commit 7dd0f66

File tree

4 files changed

+75
-9
lines changed

4 files changed

+75
-9
lines changed

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from asyncio import CancelledError
1212
from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable, Sequence
1313
from contextlib import AsyncExitStack, suppress
14-
from datetime import timedelta
14+
from datetime import datetime, timedelta, timezone
1515
from functools import partial
1616
from pathlib import Path
1717
from typing import TYPE_CHECKING, Any, Generic, Literal, cast
@@ -56,7 +56,7 @@
5656
SessionError,
5757
UserDefinedErrorHandlerError,
5858
)
59-
from crawlee.events._types import Event, EventCrawlerStatusData
59+
from crawlee.events._types import Event, EventCrawlerStatusData, EventPersistStateData
6060
from crawlee.http_clients import ImpitHttpClient
6161
from crawlee.router import Router
6262
from crawlee.sessions import SessionPool
@@ -440,6 +440,7 @@ def __init__(
440440
self._statistics = statistics or cast(
441441
'Statistics[TStatisticsState]',
442442
Statistics.with_default_state(
443+
persistence_enabled=True,
443444
periodic_message_logger=self._logger,
444445
statistics_log_format=self._statistics_log_format,
445446
log_message='Current request statistics:',
@@ -744,6 +745,16 @@ async def _run_crawler(self) -> None:
744745

745746
await self._autoscaled_pool.run()
746747

748+
# Emit PERSIST_STATE event when crawler is finishing to allow listeners to persist their state if needed
749+
if not self.statistics.state.crawler_last_started_at:
750+
raise RuntimeError('Statistics.state.crawler_last_started_at not set.')
751+
run_duration = datetime.now(timezone.utc) - self.statistics.state.crawler_last_started_at
752+
self._statistics.state.crawler_runtime = self.statistics.state.crawler_runtime + run_duration
753+
self._service_locator.get_event_manager().emit(
754+
event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=False)
755+
)
756+
await asyncio.sleep(10)
757+
747758
async def add_requests(
748759
self,
749760
requests: Sequence[str | Request],

src/crawlee/statistics/_statistics.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,7 @@ def calculate(self) -> FinalStatistics:
250250
if self._instance_start is None:
251251
raise RuntimeError('The Statistics object is not initialized')
252252

253-
crawler_runtime = datetime.now(timezone.utc) - self._instance_start
254-
total_minutes = crawler_runtime.total_seconds() / 60
253+
total_minutes = self.state.crawler_runtime.total_seconds() / 60
255254
state = self._state.current_value
256255
serialized_state = state.model_dump(by_alias=False)
257256

@@ -262,7 +261,7 @@ def calculate(self) -> FinalStatistics:
262261
requests_failed_per_minute=math.floor(state.requests_failed / total_minutes) if total_minutes else 0,
263262
request_total_duration=state.request_total_finished_duration + state.request_total_failed_duration,
264263
requests_total=state.requests_failed + state.requests_finished,
265-
crawler_runtime=crawler_runtime,
264+
crawler_runtime=state.crawler_runtime,
266265
requests_finished=state.requests_finished,
267266
requests_failed=state.requests_failed,
268267
retry_histogram=serialized_state['request_retry_histogram'],

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from __future__ import annotations
33

44
import asyncio
5+
import concurrent
56
import json
67
import logging
78
import os
@@ -1643,3 +1644,58 @@ async def handler(context: BasicCrawlingContext) -> None:
16431644

16441645
# Crawler should not fall back to the default storage after the purge
16451646
assert await unrelated_rq.fetch_next_request() == unrelated_request
1647+
1648+
1649+
async def _run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
1650+
"""Run crawler and return its statistics state.
1651+
1652+
Must be defined like this to be picklable for ProcessPoolExecutor."""
1653+
service_locator.set_configuration(
1654+
Configuration(
1655+
crawlee_storage_dir=storage_dir, # type: ignore[call-arg]
1656+
purge_on_start=False,
1657+
)
1658+
)
1659+
1660+
async def request_handler(context: BasicCrawlingContext) -> None:
1661+
context.log.info(f'Processing {context.request.url} ...')
1662+
1663+
crawler = BasicCrawler(
1664+
request_handler=request_handler,
1665+
concurrency_settings=ConcurrencySettings(max_concurrency=1, desired_concurrency=1),
1666+
)
1667+
1668+
await crawler.run(requests)
1669+
return crawler.statistics.state
1670+
1671+
1672+
def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
1673+
return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir))
1674+
1675+
1676+
async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
1677+
"""Test that crawler statistics persist and are loaded correctly.
1678+
1679+
This test simulates starting the crawler process twice, and checks that the statistics include first run."""
1680+
1681+
with concurrent.futures.ProcessPoolExecutor() as executor:
1682+
# Crawl 2 requests in the first run and automatically persist the state.
1683+
first_run_state = executor.submit(
1684+
_process_run_crawler,
1685+
requests=['https://a.placeholder.com', 'https://b.placeholder.com'],
1686+
storage_dir=str(tmp_path),
1687+
).result()
1688+
assert first_run_state.requests_finished == 2
1689+
1690+
# Crawl 1 additional requests in the second run, but use previously automatically persisted state.
1691+
second_run_state = executor.submit(
1692+
_process_run_crawler, requests=['https://c.placeholder.com'], storage_dir=str(tmp_path)
1693+
).result()
1694+
assert second_run_state.requests_finished == 3
1695+
1696+
assert first_run_state.crawler_started_at == second_run_state.crawler_started_at
1697+
assert first_run_state.crawler_finished_at
1698+
assert second_run_state.crawler_finished_at
1699+
1700+
assert first_run_state.crawler_finished_at < second_run_state.crawler_finished_at
1701+
assert first_run_state.crawler_runtime < second_run_state.crawler_runtime

tests/unit/test_configuration.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ async def test_storage_not_persisted_when_disabled(tmp_path: Path, server_url: U
4141
)
4242
storage_client = MemoryStorageClient()
4343

44-
crawler = HttpCrawler(
45-
configuration=configuration,
46-
storage_client=storage_client,
47-
)
44+
service_locator.set_configuration(configuration)
45+
service_locator.set_storage_client(storage_client)
46+
47+
crawler = HttpCrawler()
4848

4949
@crawler.router.default_handler
5050
async def default_handler(context: HttpCrawlingContext) -> None:

0 commit comments

Comments
 (0)