Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ jobs:
- name: Run twisted tests
run: tox -e py-twisted

- name: Run pinned dependency tests (Windows, asyncio tests)
if: runner.os == 'Windows'
run: tox -e py-pinned

- name: Run pinned dependency tests (Windows, Twisted tests)
if: runner.os == 'Windows'
run: tox -e py-pinned-twisted

- name: Upload coverage report (Linux)
if: runner.os == 'Linux'
env:
Expand Down
1 change: 1 addition & 0 deletions pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ disable=
too-few-public-methods,
too-many-arguments,
too-many-instance-attributes,
too-many-lines,
# tests
duplicate-code,
import-outside-toplevel,
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
[tool.black]
line-length = 99

[tool.pytest.ini_options]
reruns = "2"
60 changes: 49 additions & 11 deletions scrapy_playwright/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import platform
import threading
from dataclasses import dataclass
from typing import Awaitable, Dict, Iterator, Optional, Tuple, Union

import scrapy
Expand Down Expand Up @@ -104,6 +105,13 @@ async def _get_header_value(
return None


@dataclass
class _QueueItem:
coro: Awaitable
promise: Deferred | asyncio.Future
loop: asyncio.AbstractEventLoop | None = None


class _ThreadedLoopAdapter:
"""Utility class to start an asyncio event loop in a new thread and redirect coroutines.
This allows to run Playwright in a different loop than the Scrapy crawler, allowing to
Expand All @@ -116,32 +124,60 @@ class _ThreadedLoopAdapter:
_stop_events: Dict[int, asyncio.Event] = {}

@classmethod
async def _handle_coro(cls, coro: Awaitable, dfd: Deferred) -> None:
async def _handle_coro_deferred(cls, queue_item: _QueueItem) -> None:
from twisted.internet import reactor

dfd: Deferred = queue_item.promise

try:
result = await coro
result = await queue_item.coro
except Exception as exc:
reactor.callFromThread(dfd.errback, failure.Failure(exc))
else:
reactor.callFromThread(dfd.callback, result)

@classmethod
async def _handle_coro_future(cls, queue_item: _QueueItem) -> None:
future: asyncio.Future = queue_item.promise
loop: asyncio.AbstractEventLoop = queue_item.loop # type: ignore[assignment]
try:
result = await queue_item.coro
except Exception as exc:
loop.call_soon_threadsafe(future.set_exception, exc)
else:
loop.call_soon_threadsafe(future.set_result, result)

@classmethod
async def _process_queue(cls) -> None:
while any(not ev.is_set() for ev in cls._stop_events.values()):
coro, dfd = await cls._coro_queue.get()
asyncio.create_task(cls._handle_coro(coro, dfd))
queue_item = await cls._coro_queue.get()
if isinstance(queue_item.promise, asyncio.Future):
asyncio.create_task(cls._handle_coro_future(queue_item))
elif isinstance(queue_item.promise, Deferred):
asyncio.create_task(cls._handle_coro_deferred(queue_item))
cls._coro_queue.task_done()

@classmethod
def _deferred_from_coro(cls, coro) -> Deferred:
def _deferred_from_coro(cls, coro: Awaitable) -> Deferred:
dfd: Deferred = Deferred()
asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, dfd)), cls._loop)
queue_item = _QueueItem(coro=coro, promise=dfd)
asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop)
return dfd

@classmethod
def start(cls, caller_id: int) -> None:
cls._stop_events[caller_id] = asyncio.Event()
def _future_from_coro(cls, coro: Awaitable) -> asyncio.Future:
target_loop = asyncio.get_running_loop() # Scrapy thread loop
future: asyncio.Future = asyncio.Future()
queue_item = _QueueItem(coro=coro, promise=future, loop=target_loop)
asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop)
return future

@classmethod
def start(cls, download_handler_id: int) -> None:
"""Start the event loop in a new thread if not already started.
Should be called from the Scrapy thread.
"""
cls._stop_events[download_handler_id] = asyncio.Event()
if not getattr(cls, "_loop", None):
policy = asyncio.DefaultEventLoopPolicy()
if platform.system() == "Windows":
Expand All @@ -155,9 +191,11 @@ def start(cls, caller_id: int) -> None:
asyncio.run_coroutine_threadsafe(cls._process_queue(), cls._loop)

@classmethod
def stop(cls, caller_id: int) -> None:
"""Wait until all handlers are closed to stop the event loop and join the thread."""
cls._stop_events[caller_id].set()
def stop(cls, download_handler_id: int) -> None:
"""Wait until all handlers are closed to stop the event loop and join the thread.
Should be called from the Scrapy thread.
"""
cls._stop_events[download_handler_id].set()
if all(ev.is_set() for ev in cls._stop_events.values()):
asyncio.run_coroutine_threadsafe(cls._coro_queue.join(), cls._loop)
cls._loop.call_soon_threadsafe(cls._loop.stop)
Expand Down
34 changes: 23 additions & 11 deletions scrapy_playwright/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from scrapy.http.headers import Headers
from scrapy.responsetypes import responsetypes
from scrapy.settings import Settings
from scrapy.utils.defer import deferred_from_coro, maybe_deferred_to_future
from scrapy.utils.defer import deferred_from_coro
from scrapy.utils.misc import load_object
from scrapy.utils.reactor import verify_installed_reactor
from twisted.internet.defer import Deferred, inlineCallbacks
Expand Down Expand Up @@ -144,18 +144,21 @@ def __init__(self, crawler: Crawler) -> None:
verify_installed_reactor("twisted.internet.asyncioreactor.AsyncioSelectorReactor")
if _SCRAPY_ASYNC_API:
super().__init__(crawler=crawler)
crawler.signals.connect(self._launch, signals.engine_started)
else:
super().__init__( # pylint: disable=unexpected-keyword-arg
settings=crawler.settings, crawler=crawler
)
crawler.signals.connect(self._engine_started, signals.engine_started)
self.stats = crawler.stats
self.config = Config.from_settings(crawler.settings)

if self.config.use_threaded_loop:
_ThreadedLoopAdapter.start(id(self))

if _SCRAPY_ASYNC_API:
crawler.signals.connect(self._maybe_launch_in_thread, signals.engine_started)
else:
crawler.signals.connect(self._engine_started, signals.engine_started)

self.browser_launch_lock = asyncio.Lock()
self.context_launch_lock = asyncio.Lock()
self.context_wrappers: Dict[str, BrowserContextWrapper] = {}
Expand Down Expand Up @@ -186,10 +189,17 @@ def _deferred_from_coro(self, coro: Awaitable) -> Deferred:
return _ThreadedLoopAdapter._deferred_from_coro(coro)
return deferred_from_coro(coro)

def _maybe_future_from_coro(self, coro: Awaitable) -> Awaitable | asyncio.Future:
if self.config.use_threaded_loop:
return _ThreadedLoopAdapter._future_from_coro(coro)
return coro

def _engine_started(self) -> Deferred:
"""Launch the browser. Use the engine_started signal as it supports returning deferreds."""
return self._deferred_from_coro(self._launch())

async def _maybe_launch_in_thread(self) -> None:
await self._maybe_future_from_coro(self._launch())

async def _launch(self) -> None:
"""Launch Playwright manager and configured startup context(s)."""
logger.info("Starting download handler")
Expand Down Expand Up @@ -362,7 +372,9 @@ def _set_max_concurrent_context_count(self):
async def close(self) -> None:
logger.info("Closing download handler")
await super().close()
await self._close()
await self._maybe_future_from_coro(self._close())
if self.config.use_threaded_loop:
_ThreadedLoopAdapter.stop(id(self))

else:

Expand All @@ -371,6 +383,8 @@ def close(self) -> Deferred: # pylint: disable=invalid-overridden-method
logger.info("Closing download handler")
yield super().close()
yield self._deferred_from_coro(self._close())
if self.config.use_threaded_loop:
_ThreadedLoopAdapter.stop(id(self))

async def _close(self) -> None:
with suppress(TargetClosedError):
Expand All @@ -383,16 +397,13 @@ async def _close(self) -> None:
await self.playwright_context_manager.__aexit__()
if self.playwright:
await self.playwright.stop()
if self.config.use_threaded_loop:
_ThreadedLoopAdapter.stop(id(self))

if _SCRAPY_ASYNC_API:

async def download_request(self, request: Request) -> Response:
if request.meta.get("playwright"):
return await maybe_deferred_to_future(
self._deferred_from_coro(self._download_request(request, self._crawler.spider))
)
coro = self._download_request(request)
return await self._maybe_future_from_coro(coro)
return await super().download_request( # pylint: disable=no-value-for-parameter
request
)
Expand All @@ -408,7 +419,8 @@ def download_request( # type: ignore[misc] # pylint: disable=invalid-overridden
request=request, spider=spider
)

async def _download_request(self, request: Request, spider: Spider) -> Response:
async def _download_request(self, request: Request, spider: Spider | None = None) -> Response:
spider = spider or self._crawler.spider
counter = 0
while True:
try:
Expand Down
6 changes: 2 additions & 4 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ def allow_windows(test_method):

@wraps(test_method)
async def wrapped(self, *args, **kwargs):
caller_id = 1234
_ThreadedLoopAdapter.start(caller_id)
_ThreadedLoopAdapter.start(id(test_method))
coro = test_method(self, *args, **kwargs)
asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop).result()
_ThreadedLoopAdapter.stop(caller_id)

return wrapped

Expand All @@ -50,7 +48,7 @@ async def make_handler(settings_dict: Optional[dict] = None):
crawler = get_crawler(settings_dict=settings)
handler = ScrapyPlaywrightDownloadHandler(crawler=crawler)
try:
await handler._launch()
await handler._maybe_launch_in_thread()
except: # noqa (E722), pylint: disable=bare-except
pass
else:
Expand Down
12 changes: 2 additions & 10 deletions tests/tests_asyncio/test_browser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import platform
import random
import re
import subprocess
Expand Down Expand Up @@ -106,7 +105,6 @@ async def test_connect_devtools(self):
"Connecting to remote browser, ignoring PLAYWRIGHT_LAUNCH_OPTIONS",
) in self._caplog.record_tuples

@pytest.mark.flaky(reruns=3)
@allow_windows
async def test_connect(self):
async with remote_chromium(with_devtools_protocol=False) as browser_url:
Expand Down Expand Up @@ -192,10 +190,7 @@ async def test_browser_closed_restart(self):
== 2 # one at the beginning, one after calling Browser.close() manually
)

@pytest.mark.skipif(
platform.system() == "Windows",
reason="os.kill does not work as expected on Windows",
)
@allow_windows
async def test_browser_crashed_restart(self):
spider = Spider("foo")
async with make_handler(settings_dict={"PLAYWRIGHT_BROWSER_TYPE": "chromium"}) as handler:
Expand Down Expand Up @@ -239,10 +234,7 @@ async def test_browser_crashed_restart(self):
== 2 # one at the beginning, one after killing the broser process
)

@pytest.mark.skipif(
platform.system() == "Windows",
reason="os.kill does not work as expected on Windows",
)
@allow_windows
async def test_browser_crashed_do_not_restart(self):
spider = Spider("foo")
settings_dict = {
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_twisted/test_mixed_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _check_playwright_error(failure, url):
# different errors depending on the platform
self.assertTrue(
f"net::ERR_CONNECTION_REFUSED at {url}" in str(failure.value)
or f"Page.goto: Timeout {self.timeout_ms}ms exceeded" in str(failure.value)
or f"Timeout {self.timeout_ms}ms exceeded" in str(failure.value)
)

req1 = Request(self.server.urljoin("/index.html"))
Expand Down