diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 41bad7c..60e7dfc 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -39,6 +39,14 @@ jobs: - name: Run twisted tests run: tox -e py-twisted + - name: Run pinned dependency tests (Windows, asyncio tests) + if: runner.os == 'Windows' + run: tox -e py-pinned + + - name: Run pinned dependency tests (Windows, Twisted tests) + if: runner.os == 'Windows' + run: tox -e py-pinned-twisted + - name: Upload coverage report (Linux) if: runner.os == 'Linux' env: diff --git a/pylintrc b/pylintrc index 9508187..0bd94e7 100644 --- a/pylintrc +++ b/pylintrc @@ -9,6 +9,7 @@ disable= too-few-public-methods, too-many-arguments, too-many-instance-attributes, + too-many-lines, # tests duplicate-code, import-outside-toplevel, diff --git a/pyproject.toml b/pyproject.toml index 57a5583..9cdf736 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,2 +1,5 @@ [tool.black] line-length = 99 + +[tool.pytest.ini_options] +reruns = "2" diff --git a/scrapy_playwright/_utils.py b/scrapy_playwright/_utils.py index 1b21d78..ee186d9 100644 --- a/scrapy_playwright/_utils.py +++ b/scrapy_playwright/_utils.py @@ -2,6 +2,7 @@ import logging import platform import threading +from dataclasses import dataclass from typing import Awaitable, Dict, Iterator, Optional, Tuple, Union import scrapy @@ -104,6 +105,13 @@ async def _get_header_value( return None +@dataclass +class _QueueItem: + coro: Awaitable + promise: Deferred | asyncio.Future + loop: asyncio.AbstractEventLoop | None = None + + class _ThreadedLoopAdapter: """Utility class to start an asyncio event loop in a new thread and redirect coroutines. This allows to run Playwright in a different loop than the Scrapy crawler, allowing to @@ -116,32 +124,60 @@ class _ThreadedLoopAdapter: _stop_events: Dict[int, asyncio.Event] = {} @classmethod - async def _handle_coro(cls, coro: Awaitable, dfd: Deferred) -> None: + async def _handle_coro_deferred(cls, queue_item: _QueueItem) -> None: from twisted.internet import reactor + dfd: Deferred = queue_item.promise + try: - result = await coro + result = await queue_item.coro except Exception as exc: reactor.callFromThread(dfd.errback, failure.Failure(exc)) else: reactor.callFromThread(dfd.callback, result) + @classmethod + async def _handle_coro_future(cls, queue_item: _QueueItem) -> None: + future: asyncio.Future = queue_item.promise + loop: asyncio.AbstractEventLoop = queue_item.loop # type: ignore[assignment] + try: + result = await queue_item.coro + except Exception as exc: + loop.call_soon_threadsafe(future.set_exception, exc) + else: + loop.call_soon_threadsafe(future.set_result, result) + @classmethod async def _process_queue(cls) -> None: while any(not ev.is_set() for ev in cls._stop_events.values()): - coro, dfd = await cls._coro_queue.get() - asyncio.create_task(cls._handle_coro(coro, dfd)) + queue_item = await cls._coro_queue.get() + if isinstance(queue_item.promise, asyncio.Future): + asyncio.create_task(cls._handle_coro_future(queue_item)) + elif isinstance(queue_item.promise, Deferred): + asyncio.create_task(cls._handle_coro_deferred(queue_item)) cls._coro_queue.task_done() @classmethod - def _deferred_from_coro(cls, coro) -> Deferred: + def _deferred_from_coro(cls, coro: Awaitable) -> Deferred: dfd: Deferred = Deferred() - asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, dfd)), cls._loop) + queue_item = _QueueItem(coro=coro, promise=dfd) + asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop) return dfd @classmethod - def start(cls, caller_id: int) -> None: - cls._stop_events[caller_id] = asyncio.Event() + def _future_from_coro(cls, coro: Awaitable) -> asyncio.Future: + target_loop = asyncio.get_running_loop() # Scrapy thread loop + future: asyncio.Future = asyncio.Future() + queue_item = _QueueItem(coro=coro, promise=future, loop=target_loop) + asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop) + return future + + @classmethod + def start(cls, download_handler_id: int) -> None: + """Start the event loop in a new thread if not already started. + Should be called from the Scrapy thread. + """ + cls._stop_events[download_handler_id] = asyncio.Event() if not getattr(cls, "_loop", None): policy = asyncio.DefaultEventLoopPolicy() if platform.system() == "Windows": @@ -155,9 +191,11 @@ def start(cls, caller_id: int) -> None: asyncio.run_coroutine_threadsafe(cls._process_queue(), cls._loop) @classmethod - def stop(cls, caller_id: int) -> None: - """Wait until all handlers are closed to stop the event loop and join the thread.""" - cls._stop_events[caller_id].set() + def stop(cls, download_handler_id: int) -> None: + """Wait until all handlers are closed to stop the event loop and join the thread. + Should be called from the Scrapy thread. + """ + cls._stop_events[download_handler_id].set() if all(ev.is_set() for ev in cls._stop_events.values()): asyncio.run_coroutine_threadsafe(cls._coro_queue.join(), cls._loop) cls._loop.call_soon_threadsafe(cls._loop.stop) diff --git a/scrapy_playwright/handler.py b/scrapy_playwright/handler.py index 741687f..50d930b 100644 --- a/scrapy_playwright/handler.py +++ b/scrapy_playwright/handler.py @@ -31,7 +31,7 @@ from scrapy.http.headers import Headers from scrapy.responsetypes import responsetypes from scrapy.settings import Settings -from scrapy.utils.defer import deferred_from_coro, maybe_deferred_to_future +from scrapy.utils.defer import deferred_from_coro from scrapy.utils.misc import load_object from scrapy.utils.reactor import verify_installed_reactor from twisted.internet.defer import Deferred, inlineCallbacks @@ -144,18 +144,21 @@ def __init__(self, crawler: Crawler) -> None: verify_installed_reactor("twisted.internet.asyncioreactor.AsyncioSelectorReactor") if _SCRAPY_ASYNC_API: super().__init__(crawler=crawler) - crawler.signals.connect(self._launch, signals.engine_started) else: super().__init__( # pylint: disable=unexpected-keyword-arg settings=crawler.settings, crawler=crawler ) - crawler.signals.connect(self._engine_started, signals.engine_started) self.stats = crawler.stats self.config = Config.from_settings(crawler.settings) if self.config.use_threaded_loop: _ThreadedLoopAdapter.start(id(self)) + if _SCRAPY_ASYNC_API: + crawler.signals.connect(self._maybe_launch_in_thread, signals.engine_started) + else: + crawler.signals.connect(self._engine_started, signals.engine_started) + self.browser_launch_lock = asyncio.Lock() self.context_launch_lock = asyncio.Lock() self.context_wrappers: Dict[str, BrowserContextWrapper] = {} @@ -186,10 +189,17 @@ def _deferred_from_coro(self, coro: Awaitable) -> Deferred: return _ThreadedLoopAdapter._deferred_from_coro(coro) return deferred_from_coro(coro) + def _maybe_future_from_coro(self, coro: Awaitable) -> Awaitable | asyncio.Future: + if self.config.use_threaded_loop: + return _ThreadedLoopAdapter._future_from_coro(coro) + return coro + def _engine_started(self) -> Deferred: - """Launch the browser. Use the engine_started signal as it supports returning deferreds.""" return self._deferred_from_coro(self._launch()) + async def _maybe_launch_in_thread(self) -> None: + await self._maybe_future_from_coro(self._launch()) + async def _launch(self) -> None: """Launch Playwright manager and configured startup context(s).""" logger.info("Starting download handler") @@ -362,7 +372,9 @@ def _set_max_concurrent_context_count(self): async def close(self) -> None: logger.info("Closing download handler") await super().close() - await self._close() + await self._maybe_future_from_coro(self._close()) + if self.config.use_threaded_loop: + _ThreadedLoopAdapter.stop(id(self)) else: @@ -371,6 +383,8 @@ def close(self) -> Deferred: # pylint: disable=invalid-overridden-method logger.info("Closing download handler") yield super().close() yield self._deferred_from_coro(self._close()) + if self.config.use_threaded_loop: + _ThreadedLoopAdapter.stop(id(self)) async def _close(self) -> None: with suppress(TargetClosedError): @@ -383,16 +397,13 @@ async def _close(self) -> None: await self.playwright_context_manager.__aexit__() if self.playwright: await self.playwright.stop() - if self.config.use_threaded_loop: - _ThreadedLoopAdapter.stop(id(self)) if _SCRAPY_ASYNC_API: async def download_request(self, request: Request) -> Response: if request.meta.get("playwright"): - return await maybe_deferred_to_future( - self._deferred_from_coro(self._download_request(request, self._crawler.spider)) - ) + coro = self._download_request(request) + return await self._maybe_future_from_coro(coro) return await super().download_request( # pylint: disable=no-value-for-parameter request ) @@ -408,7 +419,8 @@ def download_request( # type: ignore[misc] # pylint: disable=invalid-overridden request=request, spider=spider ) - async def _download_request(self, request: Request, spider: Spider) -> Response: + async def _download_request(self, request: Request, spider: Spider | None = None) -> Response: + spider = spider or self._crawler.spider counter = 0 while True: try: diff --git a/tests/__init__.py b/tests/__init__.py index 24ba931..88c67ab 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -26,11 +26,9 @@ def allow_windows(test_method): @wraps(test_method) async def wrapped(self, *args, **kwargs): - caller_id = 1234 - _ThreadedLoopAdapter.start(caller_id) + _ThreadedLoopAdapter.start(id(test_method)) coro = test_method(self, *args, **kwargs) asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop).result() - _ThreadedLoopAdapter.stop(caller_id) return wrapped @@ -50,7 +48,7 @@ async def make_handler(settings_dict: Optional[dict] = None): crawler = get_crawler(settings_dict=settings) handler = ScrapyPlaywrightDownloadHandler(crawler=crawler) try: - await handler._launch() + await handler._maybe_launch_in_thread() except: # noqa (E722), pylint: disable=bare-except pass else: diff --git a/tests/tests_asyncio/test_browser.py b/tests/tests_asyncio/test_browser.py index 30f9394..2d5e208 100644 --- a/tests/tests_asyncio/test_browser.py +++ b/tests/tests_asyncio/test_browser.py @@ -1,6 +1,5 @@ import asyncio import logging -import platform import random import re import subprocess @@ -106,7 +105,6 @@ async def test_connect_devtools(self): "Connecting to remote browser, ignoring PLAYWRIGHT_LAUNCH_OPTIONS", ) in self._caplog.record_tuples - @pytest.mark.flaky(reruns=3) @allow_windows async def test_connect(self): async with remote_chromium(with_devtools_protocol=False) as browser_url: @@ -192,10 +190,7 @@ async def test_browser_closed_restart(self): == 2 # one at the beginning, one after calling Browser.close() manually ) - @pytest.mark.skipif( - platform.system() == "Windows", - reason="os.kill does not work as expected on Windows", - ) + @allow_windows async def test_browser_crashed_restart(self): spider = Spider("foo") async with make_handler(settings_dict={"PLAYWRIGHT_BROWSER_TYPE": "chromium"}) as handler: @@ -239,10 +234,7 @@ async def test_browser_crashed_restart(self): == 2 # one at the beginning, one after killing the broser process ) - @pytest.mark.skipif( - platform.system() == "Windows", - reason="os.kill does not work as expected on Windows", - ) + @allow_windows async def test_browser_crashed_do_not_restart(self): spider = Spider("foo") settings_dict = { diff --git a/tests/tests_twisted/test_mixed_requests.py b/tests/tests_twisted/test_mixed_requests.py index 627ef45..901735f 100644 --- a/tests/tests_twisted/test_mixed_requests.py +++ b/tests/tests_twisted/test_mixed_requests.py @@ -56,7 +56,7 @@ def _check_playwright_error(failure, url): # different errors depending on the platform self.assertTrue( f"net::ERR_CONNECTION_REFUSED at {url}" in str(failure.value) - or f"Page.goto: Timeout {self.timeout_ms}ms exceeded" in str(failure.value) + or f"Timeout {self.timeout_ms}ms exceeded" in str(failure.value) ) req1 = Request(self.server.urljoin("/index.html"))