Skip to content

Commit 4016c0d

Browse files
Pijukatelvdusek
andauthored
fix: Fix snapshots handling (#692)
Small fixes to snapshot handling: - Ensure time ordering of snapshots. - Remove fake time weight for samples from same time. - Remove unnecessary reversing of snapshot. - Add test for memory and cpu snapshot ordering. --------- Co-authored-by: Vlada Dusek <[email protected]>
1 parent 19f5add commit 4016c0d

File tree

8 files changed

+249
-199
lines changed

8 files changed

+249
-199
lines changed

poetry.lock

Lines changed: 58 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ pytest-xdist = "~3.6.0"
8686
respx = "~0.21.0"
8787
ruff = "~0.7.0"
8888
setuptools = "~75.3.0" # setuptools are used by pytest, but not explicitly required
89+
sortedcontainers-stubs = "^2.4.2"
8990
types-beautifulsoup4 = "~4.12.0.20240229"
9091
types-colorama = "~0.4.15.20240106"
9192
types-psutil = "~5.9.5.20240205"

src/crawlee/_autoscaling/snapshotter.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,19 @@
44

55
from datetime import datetime, timedelta, timezone
66
from logging import getLogger
7-
from typing import TYPE_CHECKING, cast
7+
from operator import attrgetter
8+
from typing import TYPE_CHECKING, TypeVar, cast
89

910
import psutil
10-
11-
from crawlee._autoscaling.types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Snapshot
11+
from sortedcontainers import SortedList
12+
13+
from crawlee._autoscaling.types import (
14+
ClientSnapshot,
15+
CpuSnapshot,
16+
EventLoopSnapshot,
17+
MemorySnapshot,
18+
Snapshot,
19+
)
1220
from crawlee._utils.byte_size import ByteSize
1321
from crawlee._utils.docs import docs_group
1422
from crawlee._utils.recurring_task import RecurringTask
@@ -21,6 +29,8 @@
2129

2230
logger = getLogger(__name__)
2331

32+
T = TypeVar('T')
33+
2434

2535
@docs_group('Classes')
2636
class Snapshotter:
@@ -94,16 +104,20 @@ def __init__(
94104
cast(float, available_memory_ratio)
95105
)
96106

97-
self._cpu_snapshots: list[CpuSnapshot] = []
98-
self._event_loop_snapshots: list[EventLoopSnapshot] = []
99-
self._memory_snapshots: list[MemorySnapshot] = []
100-
self._client_snapshots: list[ClientSnapshot] = []
107+
self._cpu_snapshots = self._get_sorted_list_by_created_at(list[CpuSnapshot]())
108+
self._event_loop_snapshots = self._get_sorted_list_by_created_at(list[EventLoopSnapshot]())
109+
self._memory_snapshots = self._get_sorted_list_by_created_at(list[MemorySnapshot]())
110+
self._client_snapshots = self._get_sorted_list_by_created_at(list[ClientSnapshot]())
101111

102112
self._snapshot_event_loop_task = RecurringTask(self._snapshot_event_loop, self._event_loop_snapshot_interval)
103113
self._snapshot_client_task = RecurringTask(self._snapshot_client, self._client_snapshot_interval)
104114

105115
self._timestamp_of_last_memory_warning: datetime = datetime.now(timezone.utc) - timedelta(hours=1)
106116

117+
@staticmethod
118+
def _get_sorted_list_by_created_at(input_list: list[T]) -> SortedList[T]:
119+
return SortedList(input_list, key=attrgetter('created_at'))
120+
107121
@staticmethod
108122
def _get_default_max_memory_size(available_memory_ratio: float) -> ByteSize:
109123
"""Default `memory_max_size` is 1/4 of the total system memory."""
@@ -194,7 +208,7 @@ def _get_sample(snapshots: list[Snapshot], duration: timedelta | None = None) ->
194208
return []
195209

196210
latest_time = snapshots[-1].created_at
197-
return [snapshot for snapshot in reversed(snapshots) if latest_time - snapshot.created_at <= duration]
211+
return [snapshot for snapshot in snapshots if latest_time - snapshot.created_at <= duration]
198212

199213
def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None:
200214
"""Captures a snapshot of the current CPU usage.
@@ -213,7 +227,7 @@ def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None:
213227

214228
snapshots = cast(list[Snapshot], self._cpu_snapshots)
215229
self._prune_snapshots(snapshots, event_data.cpu_info.created_at)
216-
self._cpu_snapshots.append(snapshot)
230+
self._cpu_snapshots.add(snapshot)
217231

218232
def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
219233
"""Captures a snapshot of the current memory usage.
@@ -233,8 +247,7 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
233247

234248
snapshots = cast(list[Snapshot], self._memory_snapshots)
235249
self._prune_snapshots(snapshots, snapshot.created_at)
236-
self._memory_snapshots.append(snapshot)
237-
250+
self._memory_snapshots.add(snapshot)
238251
self._evaluate_memory_load(event_data.memory_info.current_size, event_data.memory_info.created_at)
239252

240253
def _snapshot_event_loop(self) -> None:
@@ -254,7 +267,7 @@ def _snapshot_event_loop(self) -> None:
254267

255268
snapshots = cast(list[Snapshot], self._event_loop_snapshots)
256269
self._prune_snapshots(snapshots, snapshot.created_at)
257-
self._event_loop_snapshots.append(snapshot)
270+
self._event_loop_snapshots.add(snapshot)
258271

259272
def _snapshot_client(self) -> None:
260273
"""Captures a snapshot of the current API state by checking for rate limit errors (HTTP 429).
@@ -271,7 +284,7 @@ def _snapshot_client(self) -> None:
271284

272285
snapshots = cast(list[Snapshot], self._client_snapshots)
273286
self._prune_snapshots(snapshots, snapshot.created_at)
274-
self._client_snapshots.append(snapshot)
287+
self._client_snapshots.add(snapshot)
275288

276289
def _prune_snapshots(self, snapshots: list[Snapshot], now: datetime) -> None:
277290
"""Removes snapshots that are older than the `self._snapshot_history`.

0 commit comments

Comments
 (0)