Skip to content

Commit 6a44f17

Browse files
authored
fix: Ensure persist state event emission when exiting EventManager context (#1562)
### Description - Fixes root cause for flakiness in `test_crawler_statistics_persistence` - Previously, `BasicCrawler` was manually emitting a persist state event when exiting. Now this should be done by the `EventManager`. - Statistics used to be double-persisted previously, and due to a race condition, the old state could be persisted sometimes. Now the statistics are persisted by themselves when exiting their own context, and not by the Crawler-emitted event - `StatisticsState.crawler_runtime` changed to a computed field in a backwards compatible way. This allows pushing the runtime calculation to the state and ensures consistency between attribute access and persistence. (This prevents a theoretical race condition when automatic persistence triggers almost at the same time as the crawler finishes, and again causes the scenario described above). ### Issues - Closes: #1560 ### Testing - Stress testing in CI - Added unit test ### Checklist - [x] CI passed
1 parent 4b41e97 commit 6a44f17

File tree

6 files changed

+50
-28
lines changed

6 files changed

+50
-28
lines changed

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
SessionError,
5757
UserDefinedErrorHandlerError,
5858
)
59-
from crawlee.events._types import Event, EventCrawlerStatusData, EventPersistStateData
59+
from crawlee.events._types import Event, EventCrawlerStatusData
6060
from crawlee.http_clients import ImpitHttpClient
6161
from crawlee.router import Router
6262
from crawlee.sessions import SessionPool
@@ -751,9 +751,6 @@ async def _run_crawler(self) -> None:
751751

752752
await self._autoscaled_pool.run()
753753

754-
# Emit PERSIST_STATE event when crawler is finishing to allow listeners to persist their state if needed
755-
event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=False))
756-
757754
async def add_requests(
758755
self,
759756
requests: Sequence[str | Request],

src/crawlee/events/_event_manager.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,13 @@ async def __aexit__(
130130
if not self._active:
131131
raise RuntimeError(f'The {self.__class__.__name__} is not active.')
132132

133+
# Stop persist state event periodic emission and manually emit last one to ensure latest state is saved.
134+
await self._emit_persist_state_event_rec_task.stop()
135+
await self._emit_persist_state_event()
133136
await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout)
134137
self._event_emitter.remove_all_listeners()
135138
self._listener_tasks.clear()
136139
self._listeners_to_wrappers.clear()
137-
await self._emit_persist_state_event_rec_task.stop()
138140
self._active = False
139141

140142
@overload

src/crawlee/statistics/_models.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import json
4+
import warnings
45
from dataclasses import asdict, dataclass
56
from datetime import datetime, timedelta, timezone
67
from typing import Annotated, Any
@@ -76,7 +77,6 @@ class StatisticsState(BaseModel):
7677
crawler_started_at: Annotated[datetime | None, Field(alias='crawlerStartedAt')] = None
7778
crawler_last_started_at: Annotated[datetime | None, Field(alias='crawlerLastStartTimestamp')] = None
7879
crawler_finished_at: Annotated[datetime | None, Field(alias='crawlerFinishedAt')] = None
79-
crawler_runtime: Annotated[timedelta_ms, Field(alias='crawlerRuntimeMillis')] = timedelta()
8080
errors: dict[str, Any] = Field(default_factory=dict)
8181
retry_errors: dict[str, Any] = Field(alias='retryErrors', default_factory=dict)
8282
requests_with_status_code: dict[str, int] = Field(alias='requestsWithStatusCode', default_factory=dict)
@@ -93,6 +93,37 @@ class StatisticsState(BaseModel):
9393
),
9494
] = {}
9595

96+
# Used to track the crawler runtime, that had already been persisted. This is the runtime from previous runs.
97+
_runtime_offset: Annotated[timedelta, Field(exclude=True)] = timedelta()
98+
99+
def model_post_init(self, /, __context: Any) -> None:
100+
self._runtime_offset = self.crawler_runtime or self._runtime_offset
101+
102+
@property
103+
def crawler_runtime(self) -> timedelta:
104+
if self.crawler_last_started_at:
105+
finished_at = self.crawler_finished_at or datetime.now(timezone.utc)
106+
return self._runtime_offset + finished_at - self.crawler_last_started_at
107+
return self._runtime_offset
108+
109+
@crawler_runtime.setter
110+
def crawler_runtime(self, value: timedelta) -> None:
111+
# Setter for backwards compatibility only, the crawler_runtime is now computed_field, and cant be set manually.
112+
# To be removed in v2 release https://github.com/apify/crawlee-python/issues/1567
113+
warnings.warn(
114+
f"Setting 'crawler_runtime' is deprecated and will be removed in a future version."
115+
f' Value {value} will not be used.',
116+
DeprecationWarning,
117+
stacklevel=2,
118+
)
119+
120+
@computed_field(alias='crawlerRuntimeMillis')
121+
def crawler_runtime_for_serialization(self) -> timedelta:
122+
if self.crawler_last_started_at:
123+
finished_at = self.crawler_finished_at or datetime.now(timezone.utc)
124+
return self._runtime_offset + finished_at - self.crawler_last_started_at
125+
return self._runtime_offset
126+
96127
@computed_field(alias='requestTotalDurationMillis', return_type=timedelta_ms) # type: ignore[prop-decorator]
97128
@property
98129
def request_total_duration(self) -> timedelta:

src/crawlee/statistics/_statistics.py

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,6 @@ def __init__(
110110
# Flag to indicate the context state.
111111
self._active = False
112112

113-
# Pre-existing runtime offset, that can be non-zero when restoring serialized state from KVS.
114-
self._runtime_offset = timedelta(seconds=0)
115-
116113
def replace_state_model(self, state_model: type[TNewStatisticsState]) -> Statistics[TNewStatisticsState]:
117114
"""Create near copy of the `Statistics` with replaced `state_model`."""
118115
new_statistics: Statistics[TNewStatisticsState] = Statistics(
@@ -168,8 +165,8 @@ async def __aenter__(self) -> Self:
168165
raise RuntimeError(f'The {self.__class__.__name__} is already active.')
169166

170167
await self._state.initialize()
171-
172-
self._runtime_offset = self.state.crawler_runtime
168+
# Reset `crawler_finished_at` to indicate a new run in progress.
169+
self.state.crawler_finished_at = None
173170

174171
# Start periodic logging and let it print initial state before activation.
175172
self._periodic_logger.start()
@@ -200,10 +197,6 @@ async def __aexit__(
200197
# Stop logging and deactivate the statistics to prevent further changes to crawler_runtime
201198
await self._periodic_logger.stop()
202199
self.state.crawler_finished_at = datetime.now(timezone.utc)
203-
self.state.crawler_runtime = (
204-
self._runtime_offset + self.state.crawler_finished_at - self.state.crawler_last_started_at
205-
)
206-
207200
self._active = False
208201
await self._state.teardown()
209202

@@ -262,20 +255,8 @@ def record_request_processing_failure(self, request_id_or_key: str) -> None:
262255

263256
del self._requests_in_progress[request_id_or_key]
264257

265-
def _update_crawler_runtime(self) -> None:
266-
current_run_duration = (
267-
(datetime.now(timezone.utc) - self.state.crawler_last_started_at)
268-
if self.state.crawler_last_started_at
269-
else timedelta()
270-
)
271-
self.state.crawler_runtime = current_run_duration + self._runtime_offset
272-
273258
def calculate(self) -> FinalStatistics:
274259
"""Calculate the current statistics."""
275-
if self._active:
276-
# Only update state when active. If not, just report the last known runtime.
277-
self._update_crawler_runtime()
278-
279260
total_minutes = self.state.crawler_runtime.total_seconds() / 60
280261
state = self._state.current_value
281262
serialized_state = state.model_dump(by_alias=False)

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1673,7 +1673,6 @@ def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsSta
16731673
return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir))
16741674

16751675

1676-
@pytest.mark.skip(reason='This test is flaky, see https://github.com/apify/crawlee-python/issues/1560.')
16771676
async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
16781677
"""Test that crawler statistics persist and are loaded correctly.
16791678

tests/unit/events/test_event_manager.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from datetime import timedelta
66
from functools import update_wrapper
77
from typing import TYPE_CHECKING, Any
8+
from unittest import mock
89
from unittest.mock import AsyncMock, MagicMock
910

1011
import pytest
@@ -207,3 +208,14 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event
207208
await event_manager.wait_for_all_listeners_to_complete()
208209

209210
assert event_manager.active is True
211+
212+
213+
async def test_event_manager_in_context_persistence() -> None:
214+
"""Test that entering the `EventManager` context emits persist state event at least once."""
215+
event_manager = EventManager()
216+
217+
with mock.patch.object(event_manager, '_emit_persist_state_event', AsyncMock()) as mocked_emit_persist_state_event:
218+
async with event_manager:
219+
pass
220+
221+
assert mocked_emit_persist_state_event.call_count >= 1

0 commit comments

Comments
 (0)