Skip to content

Commit 0d6c3f6

Browse files
authored
fix: Fix crawler_runtime not being updated during run and only in the end (#1540)
### Description - Fix `BasicCrawler.statistics.state.crawler_runtime` to be properly updated on each `BasicCrawler.statistics.calculate()` call when the statistics are still active. - Do not update `BasicCrawler.statistics.state.crawler_runtime` on `BasicCrawler.statistics.calculate()` when the statistics are already deactivated - to avoid confusing differences between logged and persisted state. ### Issues Closes: #1541 ### Testing - Added test. ### Checklist - [ ] CI passed
1 parent b8afba7 commit 0d6c3f6

File tree

3 files changed

+54
-28
lines changed

3 files changed

+54
-28
lines changed

src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ def __init__(self) -> None:
7171
async def __aenter__(self) -> Self:
7272
self._active = True
7373
await self._state.initialize()
74-
self._after_initialize()
7574
return self
7675

7776
async def __aexit__(

src/crawlee/statistics/_statistics.py

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Inspiration: https://github.com/apify/crawlee/blob/v3.9.2/packages/core/src/crawlers/statistics.ts
22
from __future__ import annotations
33

4+
import asyncio
45
import math
56
import time
67
from datetime import datetime, timedelta, timezone
@@ -84,8 +85,6 @@ def __init__(
8485
self._id = Statistics.__next_id
8586
Statistics.__next_id += 1
8687

87-
self._instance_start: datetime | None = None
88-
8988
self.error_tracker = ErrorTracker(
9089
save_error_snapshots=save_error_snapshots,
9190
snapshot_kvs_name=persist_state_kvs_name,
@@ -111,6 +110,9 @@ def __init__(
111110
# Flag to indicate the context state.
112111
self._active = False
113112

113+
# Pre-existing runtime offset, that can be non-zero when restoring serialized state from KVS.
114+
self._runtime_offset = timedelta(seconds=0)
115+
114116
def replace_state_model(self, state_model: type[TNewStatisticsState]) -> Statistics[TNewStatisticsState]:
115117
"""Create near copy of the `Statistics` with replaced `state_model`."""
116118
new_statistics: Statistics[TNewStatisticsState] = Statistics(
@@ -165,14 +167,17 @@ async def __aenter__(self) -> Self:
165167
if self._active:
166168
raise RuntimeError(f'The {self.__class__.__name__} is already active.')
167169

168-
self._active = True
169-
self._instance_start = datetime.now(timezone.utc)
170-
171170
await self._state.initialize()
172-
self._after_initialize()
173171

172+
self._runtime_offset = self.state.crawler_runtime
173+
174+
# Start periodic logging and let it print initial state before activation.
174175
self._periodic_logger.start()
176+
await asyncio.sleep(0.01)
177+
self._active = True
175178

179+
self.state.crawler_last_started_at = datetime.now(timezone.utc)
180+
self.state.crawler_started_at = self.state.crawler_started_at or self.state.crawler_last_started_at
176181
return self
177182

178183
async def __aexit__(
@@ -191,14 +196,16 @@ async def __aexit__(
191196

192197
if not self.state.crawler_last_started_at:
193198
raise RuntimeError('Statistics.state.crawler_last_started_at not set.')
194-
self.state.crawler_finished_at = datetime.now(timezone.utc)
195-
self.state.crawler_runtime += self.state.crawler_finished_at - self.state.crawler_last_started_at
196-
197-
await self._state.teardown()
198199

200+
# Stop logging and deactivate the statistics to prevent further changes to crawler_runtime
199201
await self._periodic_logger.stop()
202+
self.state.crawler_finished_at = datetime.now(timezone.utc)
203+
self.state.crawler_runtime = (
204+
self._runtime_offset + self.state.crawler_finished_at - self.state.crawler_last_started_at
205+
)
200206

201207
self._active = False
208+
await self._state.teardown()
202209

203210
@property
204211
def state(self) -> TStatisticsState:
@@ -255,10 +262,19 @@ def record_request_processing_failure(self, request_id_or_key: str) -> None:
255262

256263
del self._requests_in_progress[request_id_or_key]
257264

265+
def _update_crawler_runtime(self) -> None:
266+
current_run_duration = (
267+
(datetime.now(timezone.utc) - self.state.crawler_last_started_at)
268+
if self.state.crawler_last_started_at
269+
else timedelta()
270+
)
271+
self.state.crawler_runtime = current_run_duration + self._runtime_offset
272+
258273
def calculate(self) -> FinalStatistics:
259274
"""Calculate the current statistics."""
260-
if self._instance_start is None:
261-
raise RuntimeError('The Statistics object is not initialized')
275+
if self._active:
276+
# Only update state when active. If not, just report the last known runtime.
277+
self._update_crawler_runtime()
262278

263279
total_minutes = self.state.crawler_runtime.total_seconds() / 60
264280
state = self._state.current_value
@@ -291,21 +307,6 @@ def _log(self) -> None:
291307
else:
292308
self._periodic_message_logger.info(self._log_message, extra=stats.to_dict())
293309

294-
def _after_initialize(self) -> None:
295-
state = self._state.current_value
296-
297-
if state.crawler_started_at is None:
298-
state.crawler_started_at = datetime.now(timezone.utc)
299-
300-
if state.stats_persisted_at is not None and state.crawler_last_started_at:
301-
self._instance_start = datetime.now(timezone.utc) - (
302-
state.stats_persisted_at - state.crawler_last_started_at
303-
)
304-
elif state.crawler_last_started_at:
305-
self._instance_start = state.crawler_last_started_at
306-
307-
state.crawler_last_started_at = self._instance_start
308-
309310
def _save_retry_count_for_request(self, record: RequestProcessingRecord) -> None:
310311
retry_count = record.retry_count
311312
state = self._state.current_value

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1701,3 +1701,29 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
17011701

17021702
assert first_run_state.crawler_finished_at < second_run_state.crawler_finished_at
17031703
assert first_run_state.crawler_runtime < second_run_state.crawler_runtime
1704+
1705+
1706+
async def test_crawler_intermediate_statistics() -> None:
1707+
"""Test that crawler statistics are correctly updating total runtime on every calculate call."""
1708+
crawler = BasicCrawler()
1709+
check_time = timedelta(seconds=0.1)
1710+
1711+
async def wait_for_statistics_initialization() -> None:
1712+
while not crawler.statistics.active: # noqa: ASYNC110 # It is ok for tests.
1713+
await asyncio.sleep(0.1)
1714+
1715+
@crawler.router.default_handler
1716+
async def handler(_: BasicCrawlingContext) -> None:
1717+
await asyncio.sleep(check_time.total_seconds() * 5)
1718+
1719+
# Start crawler and wait until statistics are initialized.
1720+
crawler_task = asyncio.create_task(crawler.run(['https://a.placeholder.com']))
1721+
await wait_for_statistics_initialization()
1722+
1723+
# Wait some time and check that runtime is updated.
1724+
await asyncio.sleep(check_time.total_seconds())
1725+
crawler.statistics.calculate()
1726+
assert crawler.statistics.state.crawler_runtime >= check_time
1727+
1728+
# Wait for crawler to finish
1729+
await crawler_task

0 commit comments

Comments
 (0)