From a4852c5c3fa8bbe9140360fbff134347a38d8803 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 12:50:02 -0300 Subject: [PATCH 01/12] Close thread loop adapter from the correct thread --- scrapy_playwright/_utils.py | 15 ++++++++++----- scrapy_playwright/handler.py | 6 ++++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/scrapy_playwright/_utils.py b/scrapy_playwright/_utils.py index 1b21d78..7ca8a12 100644 --- a/scrapy_playwright/_utils.py +++ b/scrapy_playwright/_utils.py @@ -140,8 +140,11 @@ def _deferred_from_coro(cls, coro) -> Deferred: return dfd @classmethod - def start(cls, caller_id: int) -> None: - cls._stop_events[caller_id] = asyncio.Event() + def start(cls, download_handler_id: int) -> None: + """Start the event loop in a new thread if not already started. + Should be called from the Scrapy thread. + """ + cls._stop_events[download_handler_id] = asyncio.Event() if not getattr(cls, "_loop", None): policy = asyncio.DefaultEventLoopPolicy() if platform.system() == "Windows": @@ -155,9 +158,11 @@ def start(cls, caller_id: int) -> None: asyncio.run_coroutine_threadsafe(cls._process_queue(), cls._loop) @classmethod - def stop(cls, caller_id: int) -> None: - """Wait until all handlers are closed to stop the event loop and join the thread.""" - cls._stop_events[caller_id].set() + def stop(cls, download_handler_id: int) -> None: + """Wait until all handlers are closed to stop the event loop and join the thread. + Should be called from the Scrapy thread. + """ + cls._stop_events[download_handler_id].set() if all(ev.is_set() for ev in cls._stop_events.values()): asyncio.run_coroutine_threadsafe(cls._coro_queue.join(), cls._loop) cls._loop.call_soon_threadsafe(cls._loop.stop) diff --git a/scrapy_playwright/handler.py b/scrapy_playwright/handler.py index 741687f..bea9b74 100644 --- a/scrapy_playwright/handler.py +++ b/scrapy_playwright/handler.py @@ -363,6 +363,8 @@ async def close(self) -> None: logger.info("Closing download handler") await super().close() await self._close() + if self.config.use_threaded_loop: + _ThreadedLoopAdapter.stop(id(self)) else: @@ -371,6 +373,8 @@ def close(self) -> Deferred: # pylint: disable=invalid-overridden-method logger.info("Closing download handler") yield super().close() yield self._deferred_from_coro(self._close()) + if self.config.use_threaded_loop: + _ThreadedLoopAdapter.stop(id(self)) async def _close(self) -> None: with suppress(TargetClosedError): @@ -383,8 +387,6 @@ async def _close(self) -> None: await self.playwright_context_manager.__aexit__() if self.playwright: await self.playwright.stop() - if self.config.use_threaded_loop: - _ThreadedLoopAdapter.stop(id(self)) if _SCRAPY_ASYNC_API: From 7b4d36b8d5eb5faa5b1d615e299563cb17cf5abe Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 14:55:17 -0300 Subject: [PATCH 02/12] Handle futures in threaded loop --- pylintrc | 1 + scrapy_playwright/_utils.py | 43 +++++++++++++++++++++++++++++++----- scrapy_playwright/handler.py | 28 +++++++++++++++-------- 3 files changed, 58 insertions(+), 14 deletions(-) diff --git a/pylintrc b/pylintrc index 9508187..0bd94e7 100644 --- a/pylintrc +++ b/pylintrc @@ -9,6 +9,7 @@ disable= too-few-public-methods, too-many-arguments, too-many-instance-attributes, + too-many-lines, # tests duplicate-code, import-outside-toplevel, diff --git a/scrapy_playwright/_utils.py b/scrapy_playwright/_utils.py index 7ca8a12..eda66b1 100644 --- a/scrapy_playwright/_utils.py +++ b/scrapy_playwright/_utils.py @@ -2,6 +2,7 @@ import logging import platform import threading +from dataclasses import dataclass from typing import Awaitable, Dict, Iterator, Optional, Tuple, Union import scrapy @@ -104,6 +105,13 @@ async def _get_header_value( return None +@dataclass +class _QueueItem: + coro: Awaitable + promise: Deferred | asyncio.Future + loop: asyncio.AbstractEventLoop | None = None + + class _ThreadedLoopAdapter: """Utility class to start an asyncio event loop in a new thread and redirect coroutines. This allows to run Playwright in a different loop than the Scrapy crawler, allowing to @@ -116,29 +124,54 @@ class _ThreadedLoopAdapter: _stop_events: Dict[int, asyncio.Event] = {} @classmethod - async def _handle_coro(cls, coro: Awaitable, dfd: Deferred) -> None: + async def _handle_coro_deferred(cls, queue_item: _QueueItem) -> None: from twisted.internet import reactor + dfd: Deferred = queue_item.promise + try: - result = await coro + result = await queue_item.coro except Exception as exc: reactor.callFromThread(dfd.errback, failure.Failure(exc)) else: reactor.callFromThread(dfd.callback, result) + @classmethod + async def _handle_coro_future(cls, queue_item: _QueueItem) -> None: + future: asyncio.Future = queue_item.promise + assert queue_item.loop is not None # typing + try: + result = await queue_item.coro + except Exception as exc: + queue_item.loop.call_soon_threadsafe(future.set_exception, exc) + else: + queue_item.loop.call_soon_threadsafe(future.set_result, result) + @classmethod async def _process_queue(cls) -> None: while any(not ev.is_set() for ev in cls._stop_events.values()): - coro, dfd = await cls._coro_queue.get() - asyncio.create_task(cls._handle_coro(coro, dfd)) + queue_item = await cls._coro_queue.get() + if isinstance(queue_item.promise, asyncio.Future): + asyncio.create_task(cls._handle_coro_future(queue_item)) + elif isinstance(queue_item.promise, Deferred): + asyncio.create_task(cls._handle_coro_deferred(queue_item)) cls._coro_queue.task_done() @classmethod def _deferred_from_coro(cls, coro) -> Deferred: dfd: Deferred = Deferred() - asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, dfd)), cls._loop) + queue_item = _QueueItem(coro=coro, promise=dfd) + asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop) return dfd + @classmethod + def _future_from_coro(cls, coro) -> asyncio.Future: + target_loop = asyncio.get_running_loop() # Scrapy thread loop + future: asyncio.Future = asyncio.Future() + queue_item = _QueueItem(coro=coro, promise=future, loop=target_loop) + asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop) + return future + @classmethod def start(cls, download_handler_id: int) -> None: """Start the event loop in a new thread if not already started. diff --git a/scrapy_playwright/handler.py b/scrapy_playwright/handler.py index bea9b74..4d51458 100644 --- a/scrapy_playwright/handler.py +++ b/scrapy_playwright/handler.py @@ -31,7 +31,7 @@ from scrapy.http.headers import Headers from scrapy.responsetypes import responsetypes from scrapy.settings import Settings -from scrapy.utils.defer import deferred_from_coro, maybe_deferred_to_future +from scrapy.utils.defer import deferred_from_coro from scrapy.utils.misc import load_object from scrapy.utils.reactor import verify_installed_reactor from twisted.internet.defer import Deferred, inlineCallbacks @@ -144,18 +144,21 @@ def __init__(self, crawler: Crawler) -> None: verify_installed_reactor("twisted.internet.asyncioreactor.AsyncioSelectorReactor") if _SCRAPY_ASYNC_API: super().__init__(crawler=crawler) - crawler.signals.connect(self._launch, signals.engine_started) else: super().__init__( # pylint: disable=unexpected-keyword-arg settings=crawler.settings, crawler=crawler ) - crawler.signals.connect(self._engine_started, signals.engine_started) self.stats = crawler.stats self.config = Config.from_settings(crawler.settings) if self.config.use_threaded_loop: _ThreadedLoopAdapter.start(id(self)) + if _SCRAPY_ASYNC_API: + crawler.signals.connect(self._maybe_launch_in_thread, signals.engine_started) + else: + crawler.signals.connect(self._engine_started, signals.engine_started) + self.browser_launch_lock = asyncio.Lock() self.context_launch_lock = asyncio.Lock() self.context_wrappers: Dict[str, BrowserContextWrapper] = {} @@ -186,10 +189,17 @@ def _deferred_from_coro(self, coro: Awaitable) -> Deferred: return _ThreadedLoopAdapter._deferred_from_coro(coro) return deferred_from_coro(coro) + def _future_from_coro(self, coro: Awaitable) -> asyncio.Future: + if self.config.use_threaded_loop: + return _ThreadedLoopAdapter._future_from_coro(coro) + return asyncio.ensure_future(coro) + def _engine_started(self) -> Deferred: - """Launch the browser. Use the engine_started signal as it supports returning deferreds.""" return self._deferred_from_coro(self._launch()) + async def _maybe_launch_in_thread(self) -> None: + await self._future_from_coro(self._launch()) + async def _launch(self) -> None: """Launch Playwright manager and configured startup context(s).""" logger.info("Starting download handler") @@ -362,7 +372,7 @@ def _set_max_concurrent_context_count(self): async def close(self) -> None: logger.info("Closing download handler") await super().close() - await self._close() + await self._future_from_coro(self._close()) if self.config.use_threaded_loop: _ThreadedLoopAdapter.stop(id(self)) @@ -392,9 +402,8 @@ async def _close(self) -> None: async def download_request(self, request: Request) -> Response: if request.meta.get("playwright"): - return await maybe_deferred_to_future( - self._deferred_from_coro(self._download_request(request, self._crawler.spider)) - ) + coro = self._download_request(request) + return await self._future_from_coro(coro) return await super().download_request( # pylint: disable=no-value-for-parameter request ) @@ -410,7 +419,8 @@ def download_request( # type: ignore[misc] # pylint: disable=invalid-overridden request=request, spider=spider ) - async def _download_request(self, request: Request, spider: Spider) -> Response: + async def _download_request(self, request: Request, spider: Spider | None = None) -> Response: + spider = spider or self._crawler.spider counter = 0 while True: try: From 11f47f5ac403a008ab63efe1bca40c0dab3b9801 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 15:23:50 -0300 Subject: [PATCH 03/12] Remove assert --- scrapy_playwright/_utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scrapy_playwright/_utils.py b/scrapy_playwright/_utils.py index eda66b1..29a0d87 100644 --- a/scrapy_playwright/_utils.py +++ b/scrapy_playwright/_utils.py @@ -139,13 +139,13 @@ async def _handle_coro_deferred(cls, queue_item: _QueueItem) -> None: @classmethod async def _handle_coro_future(cls, queue_item: _QueueItem) -> None: future: asyncio.Future = queue_item.promise - assert queue_item.loop is not None # typing + loop: asyncio.AbstractEventLoop = queue_item.loop # type: ignore[assignment] try: result = await queue_item.coro except Exception as exc: - queue_item.loop.call_soon_threadsafe(future.set_exception, exc) + loop.call_soon_threadsafe(future.set_exception, exc) else: - queue_item.loop.call_soon_threadsafe(future.set_result, result) + loop.call_soon_threadsafe(future.set_result, result) @classmethod async def _process_queue(cls) -> None: From a0dfd8bce916a23ac7b7c3c11e2ea7ec42a35a82 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 16:43:27 -0300 Subject: [PATCH 04/12] Do not stop threaded loop adapter for each test method --- tests/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index 24ba931..4ca8f88 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -26,11 +26,10 @@ def allow_windows(test_method): @wraps(test_method) async def wrapped(self, *args, **kwargs): - caller_id = 1234 - _ThreadedLoopAdapter.start(caller_id) + _ThreadedLoopAdapter.start(id(test_method)) coro = test_method(self, *args, **kwargs) - asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop).result() - _ThreadedLoopAdapter.stop(caller_id) + future = asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop) + return future.result() return wrapped From abff17ba9f0a4cee4f6c533353546ac9f0a07a1e Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 16:46:03 -0300 Subject: [PATCH 05/12] Update crashed browser tests to run on Windows --- tests/tests_asyncio/test_browser.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/tests/tests_asyncio/test_browser.py b/tests/tests_asyncio/test_browser.py index 30f9394..c249e32 100644 --- a/tests/tests_asyncio/test_browser.py +++ b/tests/tests_asyncio/test_browser.py @@ -1,6 +1,5 @@ import asyncio import logging -import platform import random import re import subprocess @@ -192,10 +191,7 @@ async def test_browser_closed_restart(self): == 2 # one at the beginning, one after calling Browser.close() manually ) - @pytest.mark.skipif( - platform.system() == "Windows", - reason="os.kill does not work as expected on Windows", - ) + @allow_windows async def test_browser_crashed_restart(self): spider = Spider("foo") async with make_handler(settings_dict={"PLAYWRIGHT_BROWSER_TYPE": "chromium"}) as handler: @@ -239,10 +235,7 @@ async def test_browser_crashed_restart(self): == 2 # one at the beginning, one after killing the broser process ) - @pytest.mark.skipif( - platform.system() == "Windows", - reason="os.kill does not work as expected on Windows", - ) + @allow_windows async def test_browser_crashed_do_not_restart(self): spider = Spider("foo") settings_dict = { From 5c8939785977eac355a484586ed91bc5bbbacf31 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 17:24:07 -0300 Subject: [PATCH 06/12] Tests: maybe launch in thread --- tests/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/__init__.py b/tests/__init__.py index 4ca8f88..90afb8b 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -49,7 +49,7 @@ async def make_handler(settings_dict: Optional[dict] = None): crawler = get_crawler(settings_dict=settings) handler = ScrapyPlaywrightDownloadHandler(crawler=crawler) try: - await handler._launch() + await handler._maybe_launch_in_thread() except: # noqa (E722), pylint: disable=bare-except pass else: From 2e4a9923b9c4d270be95e27f4b25498444f0a9af Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 17:31:07 -0300 Subject: [PATCH 07/12] CI: also run pinned dependency envs on Windows --- .github/workflows/tests.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 41bad7c..60e7dfc 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -39,6 +39,14 @@ jobs: - name: Run twisted tests run: tox -e py-twisted + - name: Run pinned dependency tests (Windows, asyncio tests) + if: runner.os == 'Windows' + run: tox -e py-pinned + + - name: Run pinned dependency tests (Windows, Twisted tests) + if: runner.os == 'Windows' + run: tox -e py-pinned-twisted + - name: Upload coverage report (Linux) if: runner.os == 'Linux' env: From f34134dfb4df6bb253d59ea58f0501164bde9ba6 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 17:39:39 -0300 Subject: [PATCH 08/12] Do not return from async tests --- tests/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index 90afb8b..88c67ab 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -28,8 +28,7 @@ def allow_windows(test_method): async def wrapped(self, *args, **kwargs): _ThreadedLoopAdapter.start(id(test_method)) coro = test_method(self, *args, **kwargs) - future = asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop) - return future.result() + asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop).result() return wrapped From 0c09c856e600d873b84b8c39f5fb866d4ff7f2d3 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 17:48:03 -0300 Subject: [PATCH 09/12] _maybe_future_from_coro --- scrapy_playwright/handler.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scrapy_playwright/handler.py b/scrapy_playwright/handler.py index 4d51458..50d930b 100644 --- a/scrapy_playwright/handler.py +++ b/scrapy_playwright/handler.py @@ -189,16 +189,16 @@ def _deferred_from_coro(self, coro: Awaitable) -> Deferred: return _ThreadedLoopAdapter._deferred_from_coro(coro) return deferred_from_coro(coro) - def _future_from_coro(self, coro: Awaitable) -> asyncio.Future: + def _maybe_future_from_coro(self, coro: Awaitable) -> Awaitable | asyncio.Future: if self.config.use_threaded_loop: return _ThreadedLoopAdapter._future_from_coro(coro) - return asyncio.ensure_future(coro) + return coro def _engine_started(self) -> Deferred: return self._deferred_from_coro(self._launch()) async def _maybe_launch_in_thread(self) -> None: - await self._future_from_coro(self._launch()) + await self._maybe_future_from_coro(self._launch()) async def _launch(self) -> None: """Launch Playwright manager and configured startup context(s).""" @@ -372,7 +372,7 @@ def _set_max_concurrent_context_count(self): async def close(self) -> None: logger.info("Closing download handler") await super().close() - await self._future_from_coro(self._close()) + await self._maybe_future_from_coro(self._close()) if self.config.use_threaded_loop: _ThreadedLoopAdapter.stop(id(self)) @@ -403,7 +403,7 @@ async def _close(self) -> None: async def download_request(self, request: Request) -> Response: if request.meta.get("playwright"): coro = self._download_request(request) - return await self._future_from_coro(coro) + return await self._maybe_future_from_coro(coro) return await super().download_request( # pylint: disable=no-value-for-parameter request ) From f7b3056ad81468a9c96e2604151025dd96799b93 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Tue, 20 Jan 2026 17:56:05 -0300 Subject: [PATCH 10/12] Add missing type hints --- scrapy_playwright/_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scrapy_playwright/_utils.py b/scrapy_playwright/_utils.py index 29a0d87..ee186d9 100644 --- a/scrapy_playwright/_utils.py +++ b/scrapy_playwright/_utils.py @@ -158,14 +158,14 @@ async def _process_queue(cls) -> None: cls._coro_queue.task_done() @classmethod - def _deferred_from_coro(cls, coro) -> Deferred: + def _deferred_from_coro(cls, coro: Awaitable) -> Deferred: dfd: Deferred = Deferred() queue_item = _QueueItem(coro=coro, promise=dfd) asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop) return dfd @classmethod - def _future_from_coro(cls, coro) -> asyncio.Future: + def _future_from_coro(cls, coro: Awaitable) -> asyncio.Future: target_loop = asyncio.get_running_loop() # Scrapy thread loop future: asyncio.Future = asyncio.Future() queue_item = _QueueItem(coro=coro, promise=future, loop=target_loop) From 94f88ff7d69bd884622de85b78aa1421ca4d05e3 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Wed, 21 Jan 2026 07:41:16 -0300 Subject: [PATCH 11/12] Add reruns=2 to all tests --- pyproject.toml | 3 +++ tests/tests_asyncio/test_browser.py | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 57a5583..9cdf736 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,2 +1,5 @@ [tool.black] line-length = 99 + +[tool.pytest.ini_options] +reruns = "2" diff --git a/tests/tests_asyncio/test_browser.py b/tests/tests_asyncio/test_browser.py index c249e32..2d5e208 100644 --- a/tests/tests_asyncio/test_browser.py +++ b/tests/tests_asyncio/test_browser.py @@ -105,7 +105,6 @@ async def test_connect_devtools(self): "Connecting to remote browser, ignoring PLAYWRIGHT_LAUNCH_OPTIONS", ) in self._caplog.record_tuples - @pytest.mark.flaky(reruns=3) @allow_windows async def test_connect(self): async with remote_chromium(with_devtools_protocol=False) as browser_url: From 267cbdecc79f237b32190663b363f0de4f4ffe25 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta Date: Wed, 21 Jan 2026 07:43:50 -0300 Subject: [PATCH 12/12] Adjust expected failure message --- tests/tests_twisted/test_mixed_requests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_twisted/test_mixed_requests.py b/tests/tests_twisted/test_mixed_requests.py index 627ef45..901735f 100644 --- a/tests/tests_twisted/test_mixed_requests.py +++ b/tests/tests_twisted/test_mixed_requests.py @@ -56,7 +56,7 @@ def _check_playwright_error(failure, url): # different errors depending on the platform self.assertTrue( f"net::ERR_CONNECTION_REFUSED at {url}" in str(failure.value) - or f"Page.goto: Timeout {self.timeout_ms}ms exceeded" in str(failure.value) + or f"Timeout {self.timeout_ms}ms exceeded" in str(failure.value) ) req1 = Request(self.server.urljoin("/index.html"))