Skip to content
Merged
8 changes: 8 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ jobs:
- name: Run twisted tests
run: tox -e py-twisted

- name: Run pinned dependency tests (Windows, asyncio tests)
if: runner.os == 'Windows'
run: tox -e py-pinned

- name: Run pinned dependency tests (Windows, Twisted tests)
if: runner.os == 'Windows'
run: tox -e py-pinned-twisted

- name: Upload coverage report (Linux)
if: runner.os == 'Linux'
env:
Expand Down
1 change: 1 addition & 0 deletions pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ disable=
too-few-public-methods,
too-many-arguments,
too-many-instance-attributes,
too-many-lines,
# tests
duplicate-code,
import-outside-toplevel,
Expand Down
58 changes: 48 additions & 10 deletions scrapy_playwright/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import platform
import threading
from dataclasses import dataclass
from typing import Awaitable, Dict, Iterator, Optional, Tuple, Union

import scrapy
Expand Down Expand Up @@ -104,6 +105,13 @@ async def _get_header_value(
return None


@dataclass
class _QueueItem:
coro: Awaitable
promise: Deferred | asyncio.Future
loop: asyncio.AbstractEventLoop | None = None


class _ThreadedLoopAdapter:
"""Utility class to start an asyncio event loop in a new thread and redirect coroutines.
This allows to run Playwright in a different loop than the Scrapy crawler, allowing to
Expand All @@ -116,32 +124,60 @@ class _ThreadedLoopAdapter:
_stop_events: Dict[int, asyncio.Event] = {}

@classmethod
async def _handle_coro(cls, coro: Awaitable, dfd: Deferred) -> None:
async def _handle_coro_deferred(cls, queue_item: _QueueItem) -> None:
from twisted.internet import reactor

dfd: Deferred = queue_item.promise

try:
result = await coro
result = await queue_item.coro
except Exception as exc:
reactor.callFromThread(dfd.errback, failure.Failure(exc))
else:
reactor.callFromThread(dfd.callback, result)

@classmethod
async def _handle_coro_future(cls, queue_item: _QueueItem) -> None:
future: asyncio.Future = queue_item.promise
loop: asyncio.AbstractEventLoop = queue_item.loop # type: ignore[assignment]
try:
result = await queue_item.coro
except Exception as exc:
loop.call_soon_threadsafe(future.set_exception, exc)
else:
loop.call_soon_threadsafe(future.set_result, result)

@classmethod
async def _process_queue(cls) -> None:
while any(not ev.is_set() for ev in cls._stop_events.values()):
coro, dfd = await cls._coro_queue.get()
asyncio.create_task(cls._handle_coro(coro, dfd))
queue_item = await cls._coro_queue.get()
if isinstance(queue_item.promise, asyncio.Future):
asyncio.create_task(cls._handle_coro_future(queue_item))
elif isinstance(queue_item.promise, Deferred):
asyncio.create_task(cls._handle_coro_deferred(queue_item))
cls._coro_queue.task_done()

@classmethod
def _deferred_from_coro(cls, coro) -> Deferred:
dfd: Deferred = Deferred()
asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, dfd)), cls._loop)
queue_item = _QueueItem(coro=coro, promise=dfd)
asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop)
return dfd

@classmethod
def start(cls, caller_id: int) -> None:
cls._stop_events[caller_id] = asyncio.Event()
def _future_from_coro(cls, coro) -> asyncio.Future:
target_loop = asyncio.get_running_loop() # Scrapy thread loop
future: asyncio.Future = asyncio.Future()
queue_item = _QueueItem(coro=coro, promise=future, loop=target_loop)
asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop)
return future

@classmethod
def start(cls, download_handler_id: int) -> None:
"""Start the event loop in a new thread if not already started.
Should be called from the Scrapy thread.
"""
cls._stop_events[download_handler_id] = asyncio.Event()
if not getattr(cls, "_loop", None):
policy = asyncio.DefaultEventLoopPolicy()
if platform.system() == "Windows":
Expand All @@ -155,9 +191,11 @@ def start(cls, caller_id: int) -> None:
asyncio.run_coroutine_threadsafe(cls._process_queue(), cls._loop)

@classmethod
def stop(cls, caller_id: int) -> None:
"""Wait until all handlers are closed to stop the event loop and join the thread."""
cls._stop_events[caller_id].set()
def stop(cls, download_handler_id: int) -> None:
"""Wait until all handlers are closed to stop the event loop and join the thread.
Should be called from the Scrapy thread.
"""
cls._stop_events[download_handler_id].set()
if all(ev.is_set() for ev in cls._stop_events.values()):
asyncio.run_coroutine_threadsafe(cls._coro_queue.join(), cls._loop)
cls._loop.call_soon_threadsafe(cls._loop.stop)
Expand Down
34 changes: 23 additions & 11 deletions scrapy_playwright/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from scrapy.http.headers import Headers
from scrapy.responsetypes import responsetypes
from scrapy.settings import Settings
from scrapy.utils.defer import deferred_from_coro, maybe_deferred_to_future
from scrapy.utils.defer import deferred_from_coro
from scrapy.utils.misc import load_object
from scrapy.utils.reactor import verify_installed_reactor
from twisted.internet.defer import Deferred, inlineCallbacks
Expand Down Expand Up @@ -144,18 +144,21 @@ def __init__(self, crawler: Crawler) -> None:
verify_installed_reactor("twisted.internet.asyncioreactor.AsyncioSelectorReactor")
if _SCRAPY_ASYNC_API:
super().__init__(crawler=crawler)
crawler.signals.connect(self._launch, signals.engine_started)
else:
super().__init__( # pylint: disable=unexpected-keyword-arg
settings=crawler.settings, crawler=crawler
)
crawler.signals.connect(self._engine_started, signals.engine_started)
self.stats = crawler.stats
self.config = Config.from_settings(crawler.settings)

if self.config.use_threaded_loop:
_ThreadedLoopAdapter.start(id(self))

if _SCRAPY_ASYNC_API:
crawler.signals.connect(self._maybe_launch_in_thread, signals.engine_started)
else:
crawler.signals.connect(self._engine_started, signals.engine_started)

self.browser_launch_lock = asyncio.Lock()
self.context_launch_lock = asyncio.Lock()
self.context_wrappers: Dict[str, BrowserContextWrapper] = {}
Expand Down Expand Up @@ -186,10 +189,17 @@ def _deferred_from_coro(self, coro: Awaitable) -> Deferred:
return _ThreadedLoopAdapter._deferred_from_coro(coro)
return deferred_from_coro(coro)

def _future_from_coro(self, coro: Awaitable) -> asyncio.Future:
if self.config.use_threaded_loop:
return _ThreadedLoopAdapter._future_from_coro(coro)
return asyncio.ensure_future(coro)

def _engine_started(self) -> Deferred:
"""Launch the browser. Use the engine_started signal as it supports returning deferreds."""
return self._deferred_from_coro(self._launch())

async def _maybe_launch_in_thread(self) -> None:
await self._future_from_coro(self._launch())

async def _launch(self) -> None:
"""Launch Playwright manager and configured startup context(s)."""
logger.info("Starting download handler")
Expand Down Expand Up @@ -362,7 +372,9 @@ def _set_max_concurrent_context_count(self):
async def close(self) -> None:
logger.info("Closing download handler")
await super().close()
await self._close()
await self._future_from_coro(self._close())
if self.config.use_threaded_loop:
_ThreadedLoopAdapter.stop(id(self))

else:

Expand All @@ -371,6 +383,8 @@ def close(self) -> Deferred: # pylint: disable=invalid-overridden-method
logger.info("Closing download handler")
yield super().close()
yield self._deferred_from_coro(self._close())
if self.config.use_threaded_loop:
_ThreadedLoopAdapter.stop(id(self))

async def _close(self) -> None:
with suppress(TargetClosedError):
Expand All @@ -383,16 +397,13 @@ async def _close(self) -> None:
await self.playwright_context_manager.__aexit__()
if self.playwright:
await self.playwright.stop()
if self.config.use_threaded_loop:
_ThreadedLoopAdapter.stop(id(self))

if _SCRAPY_ASYNC_API:

async def download_request(self, request: Request) -> Response:
if request.meta.get("playwright"):
return await maybe_deferred_to_future(
self._deferred_from_coro(self._download_request(request, self._crawler.spider))
)
coro = self._download_request(request)
return await self._future_from_coro(coro)
return await super().download_request( # pylint: disable=no-value-for-parameter
request
)
Expand All @@ -408,7 +419,8 @@ def download_request( # type: ignore[misc] # pylint: disable=invalid-overridden
request=request, spider=spider
)

async def _download_request(self, request: Request, spider: Spider) -> Response:
async def _download_request(self, request: Request, spider: Spider | None = None) -> Response:
spider = spider or self._crawler.spider
counter = 0
while True:
try:
Expand Down
9 changes: 4 additions & 5 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ def allow_windows(test_method):

@wraps(test_method)
async def wrapped(self, *args, **kwargs):
caller_id = 1234
_ThreadedLoopAdapter.start(caller_id)
_ThreadedLoopAdapter.start(id(test_method))
coro = test_method(self, *args, **kwargs)
asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop).result()
_ThreadedLoopAdapter.stop(caller_id)
future = asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop)
return future.result()

return wrapped

Expand All @@ -50,7 +49,7 @@ async def make_handler(settings_dict: Optional[dict] = None):
crawler = get_crawler(settings_dict=settings)
handler = ScrapyPlaywrightDownloadHandler(crawler=crawler)
try:
await handler._launch()
await handler._maybe_launch_in_thread()
except: # noqa (E722), pylint: disable=bare-except
pass
else:
Expand Down
11 changes: 2 additions & 9 deletions tests/tests_asyncio/test_browser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import platform
import random
import re
import subprocess
Expand Down Expand Up @@ -192,10 +191,7 @@ async def test_browser_closed_restart(self):
== 2 # one at the beginning, one after calling Browser.close() manually
)

@pytest.mark.skipif(
platform.system() == "Windows",
reason="os.kill does not work as expected on Windows",
)
@allow_windows
async def test_browser_crashed_restart(self):
spider = Spider("foo")
async with make_handler(settings_dict={"PLAYWRIGHT_BROWSER_TYPE": "chromium"}) as handler:
Expand Down Expand Up @@ -239,10 +235,7 @@ async def test_browser_crashed_restart(self):
== 2 # one at the beginning, one after killing the broser process
)

@pytest.mark.skipif(
platform.system() == "Windows",
reason="os.kill does not work as expected on Windows",
)
@allow_windows
async def test_browser_crashed_do_not_restart(self):
spider = Spider("foo")
settings_dict = {
Expand Down
Loading