Skip to content

Commit 486fe6d

Browse files
authored
fix: Fix init of context managers and context handling in BasicCrawler (#714)
### Problems - The `EventManager` could be initialized multiple times, such as when running a `Crawler` wrapped inside an `Actor`. - In `crawler.run`, the async context was entered and exited directly, which could lead to issues if the caller had already entered it. This scenario might occur when users provide their own instances of `BrowserPool`, `SessionPool`, `EventManager`, or others. ### Solution - Address these issues by introducing an `active` flag to the existing context managers to prevent multiple initializations. - Implement an `ensure_context` helper and apply it to methods where context management is required. - Fix & improve tests to ensure these cases are covered. ### Others - I added missing names to asyncio tasks.
1 parent dcf2485 commit 486fe6d

File tree

18 files changed

+392
-87
lines changed

18 files changed

+392
-87
lines changed

src/crawlee/_autoscaling/snapshotter.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
Snapshot,
1919
)
2020
from crawlee._utils.byte_size import ByteSize
21+
from crawlee._utils.context import ensure_context
2122
from crawlee._utils.docs import docs_group
2223
from crawlee._utils.recurring_task import RecurringTask
2324
from crawlee.events._types import Event, EventSystemInfoData
@@ -114,6 +115,9 @@ def __init__(
114115

115116
self._timestamp_of_last_memory_warning: datetime = datetime.now(timezone.utc) - timedelta(hours=1)
116117

118+
# Flag to indicate the context state.
119+
self._active = False
120+
117121
@staticmethod
118122
def _get_sorted_list_by_created_at(input_list: list[T]) -> SortedList[T]:
119123
return SortedList(input_list, key=attrgetter('created_at'))
@@ -126,8 +130,21 @@ def _get_default_max_memory_size(available_memory_ratio: float) -> ByteSize:
126130
logger.info(f'Setting max_memory_size of this run to {max_memory_size}.')
127131
return max_memory_size
128132

133+
@property
134+
def active(self) -> bool:
135+
"""Indicates whether the context is active."""
136+
return self._active
137+
129138
async def __aenter__(self) -> Snapshotter:
130-
"""Starts capturing snapshots at configured intervals."""
139+
"""Starts capturing snapshots at configured intervals.
140+
141+
Raises:
142+
RuntimeError: If the context manager is already active.
143+
"""
144+
if self._active:
145+
raise RuntimeError(f'The {self.__class__.__name__} is already active.')
146+
147+
self._active = True
131148
self._event_manager.on(event=Event.SYSTEM_INFO, listener=self._snapshot_cpu)
132149
self._event_manager.on(event=Event.SYSTEM_INFO, listener=self._snapshot_memory)
133150
self._snapshot_event_loop_task.start()
@@ -144,12 +161,20 @@ async def __aexit__(
144161
145162
This method stops capturing snapshots of system resources (CPU, memory, event loop, and client information).
146163
It should be called to terminate resource capturing when it is no longer needed.
164+
165+
Raises:
166+
RuntimeError: If the context manager is not active.
147167
"""
168+
if not self._active:
169+
raise RuntimeError(f'The {self.__class__.__name__} is not active.')
170+
148171
self._event_manager.off(event=Event.SYSTEM_INFO, listener=self._snapshot_cpu)
149172
self._event_manager.off(event=Event.SYSTEM_INFO, listener=self._snapshot_memory)
150173
await self._snapshot_event_loop_task.stop()
151174
await self._snapshot_client_task.stop()
175+
self._active = False
152176

177+
@ensure_context
153178
def get_memory_sample(self, duration: timedelta | None = None) -> list[Snapshot]:
154179
"""Returns a sample of the latest memory snapshots.
155180
@@ -162,6 +187,7 @@ def get_memory_sample(self, duration: timedelta | None = None) -> list[Snapshot]
162187
snapshots = cast(list[Snapshot], self._memory_snapshots)
163188
return self._get_sample(snapshots, duration)
164189

190+
@ensure_context
165191
def get_event_loop_sample(self, duration: timedelta | None = None) -> list[Snapshot]:
166192
"""Returns a sample of the latest event loop snapshots.
167193
@@ -174,6 +200,7 @@ def get_event_loop_sample(self, duration: timedelta | None = None) -> list[Snaps
174200
snapshots = cast(list[Snapshot], self._event_loop_snapshots)
175201
return self._get_sample(snapshots, duration)
176202

203+
@ensure_context
177204
def get_cpu_sample(self, duration: timedelta | None = None) -> list[Snapshot]:
178205
"""Returns a sample of the latest CPU snapshots.
179206
@@ -186,6 +213,7 @@ def get_cpu_sample(self, duration: timedelta | None = None) -> list[Snapshot]:
186213
snapshots = cast(list[Snapshot], self._cpu_snapshots)
187214
return self._get_sample(snapshots, duration)
188215

216+
@ensure_context
189217
def get_client_sample(self, duration: timedelta | None = None) -> list[Snapshot]:
190218
"""Returns a sample of the latest client snapshots.
191219

src/crawlee/_utils/context.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from functools import wraps
5+
from typing import Any, Callable, TypeVar
6+
7+
T = TypeVar('T', bound=Callable[..., Any])
8+
9+
10+
def ensure_context(method: T) -> T:
11+
"""Decorator to ensure the (async) context manager is initialized before calling the method.
12+
13+
Args:
14+
method: The method to wrap.
15+
16+
Returns:
17+
The wrapped method with context checking applied.
18+
"""
19+
20+
@wraps(method)
21+
def sync_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
22+
if not hasattr(self, 'active'):
23+
raise RuntimeError(f'The {self.__class__.__name__} does not have the "active" attribute.')
24+
25+
if not self.active:
26+
raise RuntimeError(f'The {self.__class__.__name__} is not active. Use it within the context.')
27+
28+
return method(self, *args, **kwargs)
29+
30+
@wraps(method)
31+
async def async_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
32+
if not hasattr(self, 'active'):
33+
raise RuntimeError(f'The {self.__class__.__name__} does not have the "active" attribute.')
34+
35+
if not self.active:
36+
raise RuntimeError(f'The {self.__class__.__name__} is not active. Use it within the async context.')
37+
38+
return await method(self, *args, **kwargs)
39+
40+
return async_wrapper if asyncio.iscoroutinefunction(method) else sync_wrapper # type: ignore[return-value]

src/crawlee/basic_crawler/_basic_crawler.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ def __init__(
255255
else None,
256256
available_memory_ratio=self._configuration.available_memory_ratio,
257257
)
258-
self._pool = AutoscaledPool(
258+
self._autoscaled_pool = AutoscaledPool(
259259
system_status=SystemStatus(self._snapshotter),
260260
is_finished_function=self.__is_finished_function,
261261
is_task_ready_function=self.__is_task_ready_function,
@@ -442,7 +442,7 @@ def sigint_handler() -> None:
442442

443443
run_task.cancel()
444444

445-
run_task = asyncio.create_task(self._run_crawler())
445+
run_task = asyncio.create_task(self._run_crawler(), name='run_crawler_task')
446446

447447
with suppress(NotImplementedError): # event loop signal handlers are not supported on Windows
448448
asyncio.get_running_loop().add_signal_handler(signal.SIGINT, sigint_handler)
@@ -476,18 +476,25 @@ def sigint_handler() -> None:
476476
return final_statistics
477477

478478
async def _run_crawler(self) -> None:
479-
async with AsyncExitStack() as exit_stack:
480-
await exit_stack.enter_async_context(self._event_manager)
481-
await exit_stack.enter_async_context(self._snapshotter)
482-
await exit_stack.enter_async_context(self._statistics)
483-
484-
if self._use_session_pool:
485-
await exit_stack.enter_async_context(self._session_pool)
479+
# Collect the context managers to be entered. Context managers that are already active are excluded,
480+
# as they were likely entered by the caller, who will also be responsible for exiting them.
481+
contexts_to_enter = [
482+
cm
483+
for cm in (
484+
self._event_manager,
485+
self._snapshotter,
486+
self._statistics,
487+
self._session_pool if self._use_session_pool else None,
488+
*self._additional_context_managers,
489+
)
490+
if cm and getattr(cm, 'active', False) is False
491+
]
486492

487-
for context_manager in self._additional_context_managers:
488-
await exit_stack.enter_async_context(context_manager)
493+
async with AsyncExitStack() as exit_stack:
494+
for context in contexts_to_enter:
495+
await exit_stack.enter_async_context(context)
489496

490-
await self._pool.run()
497+
await self._autoscaled_pool.run()
491498

492499
async def add_requests(
493500
self,

src/crawlee/browsers/_base_browser_plugin.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ class BaseBrowserPlugin(ABC):
2323
AUTOMATION_LIBRARY: str | None = None
2424
"""The name of the automation library that the plugin is managing."""
2525

26+
@property
27+
@abstractmethod
28+
def active(self) -> bool:
29+
"""Indicates whether the context is active."""
30+
2631
@property
2732
@abstractmethod
2833
def browser_type(self) -> BrowserType:
@@ -45,7 +50,11 @@ def max_open_pages_per_browser(self) -> int:
4550

4651
@abstractmethod
4752
async def __aenter__(self) -> BaseBrowserPlugin:
48-
"""Enter the context manager and initialize the browser plugin."""
53+
"""Enter the context manager and initialize the browser plugin.
54+
55+
Raises:
56+
RuntimeError: If the context manager is already active.
57+
"""
4958

5059
@abstractmethod
5160
async def __aexit__(
@@ -54,7 +63,11 @@ async def __aexit__(
5463
exc_value: BaseException | None,
5564
exc_traceback: TracebackType | None,
5665
) -> None:
57-
"""Exit the context manager and close the browser plugin."""
66+
"""Exit the context manager and close the browser plugin.
67+
68+
Raises:
69+
RuntimeError: If the context manager is not active.
70+
"""
5871

5972
@abstractmethod
6073
async def new_browser(self) -> BaseBrowserController:

src/crawlee/browsers/_browser_pool.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import TYPE_CHECKING, Any
1111
from weakref import WeakValueDictionary
1212

13+
from crawlee._utils.context import ensure_context
1314
from crawlee._utils.crypto import crypto_random_object_id
1415
from crawlee._utils.docs import docs_group
1516
from crawlee._utils.recurring_task import RecurringTask
@@ -91,6 +92,9 @@ def __init__(
9192
self._pages = WeakValueDictionary[str, CrawleePage]() # Track the pages in the pool
9293
self._plugins_cycle = itertools.cycle(self._plugins) # Cycle through the plugins
9394

95+
# Flag to indicate the context state.
96+
self._active = False
97+
9498
@classmethod
9599
def with_default_plugin(
96100
cls,
@@ -148,10 +152,21 @@ def total_pages_count(self) -> int:
148152
"""Returns the total number of pages opened since the browser pool was launched."""
149153
return self._total_pages_count
150154

155+
@property
156+
def active(self) -> bool:
157+
"""Indicates whether the context is active."""
158+
return self._active
159+
151160
async def __aenter__(self) -> BrowserPool:
152-
"""Enter the context manager and initialize all browser plugins."""
153-
logger.debug('Initializing browser pool.')
161+
"""Enter the context manager and initialize all browser plugins.
162+
163+
Raises:
164+
RuntimeError: If the context manager is already active.
165+
"""
166+
if self._active:
167+
raise RuntimeError(f'The {self.__class__.__name__} is already active.')
154168

169+
self._active = True
155170
# Start the recurring tasks for identifying and closing inactive browsers
156171
self._identify_inactive_browsers_task.start()
157172
self._close_inactive_browsers_task.start()
@@ -172,8 +187,13 @@ async def __aexit__(
172187
exc_value: BaseException | None,
173188
exc_traceback: TracebackType | None,
174189
) -> None:
175-
"""Exit the context manager and close all browser plugins."""
176-
logger.debug('Closing browser pool.')
190+
"""Exit the context manager and close all browser plugins.
191+
192+
Raises:
193+
RuntimeError: If the context manager is not active.
194+
"""
195+
if not self._active:
196+
raise RuntimeError(f'The {self.__class__.__name__} is not active.')
177197

178198
await self._identify_inactive_browsers_task.stop()
179199
await self._close_inactive_browsers_task.stop()
@@ -184,6 +204,9 @@ async def __aexit__(
184204
for plugin in self._plugins:
185205
await plugin.__aexit__(exc_type, exc_value, exc_traceback)
186206

207+
self._active = False
208+
209+
@ensure_context
187210
async def new_page(
188211
self,
189212
*,
@@ -213,6 +236,7 @@ async def new_page(
213236

214237
return await self._get_new_page(page_id, plugin, proxy_info)
215238

239+
@ensure_context
216240
async def new_page_with_each_plugin(self) -> Sequence[CrawleePage]:
217241
"""Create a new page with each browser plugin in the pool.
218242

src/crawlee/browsers/_playwright_browser_plugin.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from playwright.async_api import Playwright, async_playwright
99
from typing_extensions import override
1010

11+
from crawlee._utils.context import ensure_context
1112
from crawlee._utils.docs import docs_group
1213
from crawlee.browsers._base_browser_plugin import BaseBrowserPlugin
1314
from crawlee.browsers._playwright_browser_controller import PlaywrightBrowserController
@@ -55,6 +56,14 @@ def __init__(
5556
self._playwright_context_manager = async_playwright()
5657
self._playwright: Playwright | None = None
5758

59+
# Flag to indicate the context state.
60+
self._active = False
61+
62+
@property
63+
@override
64+
def active(self) -> bool:
65+
return self._active
66+
5867
@property
5968
@override
6069
def browser_type(self) -> BrowserType:
@@ -77,7 +86,10 @@ def max_open_pages_per_browser(self) -> int:
7786

7887
@override
7988
async def __aenter__(self) -> PlaywrightBrowserPlugin:
80-
logger.debug('Initializing Playwright browser plugin.')
89+
if self._active:
90+
raise RuntimeError(f'The {self.__class__.__name__} is already active.')
91+
92+
self._active = True
8193
self._playwright = await self._playwright_context_manager.__aenter__()
8294
return self
8395

@@ -88,10 +100,14 @@ async def __aexit__(
88100
exc_value: BaseException | None,
89101
exc_traceback: TracebackType | None,
90102
) -> None:
91-
logger.debug('Closing Playwright browser plugin.')
103+
if not self._active:
104+
raise RuntimeError(f'The {self.__class__.__name__} is not active.')
105+
92106
await self._playwright_context_manager.__aexit__(exc_type, exc_value, exc_traceback)
107+
self._active = False
93108

94109
@override
110+
@ensure_context
95111
async def new_browser(self) -> PlaywrightBrowserController:
96112
if not self._playwright:
97113
raise RuntimeError('Playwright browser plugin is not initialized.')

0 commit comments

Comments
 (0)