Skip to content

Commit 7b4d36b

Browse files
committed
Handle futures in threaded loop
1 parent a4852c5 commit 7b4d36b

File tree

3 files changed

+58
-14
lines changed

3 files changed

+58
-14
lines changed

pylintrc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ disable=
99
too-few-public-methods,
1010
too-many-arguments,
1111
too-many-instance-attributes,
12+
too-many-lines,
1213
# tests
1314
duplicate-code,
1415
import-outside-toplevel,

scrapy_playwright/_utils.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import platform
44
import threading
5+
from dataclasses import dataclass
56
from typing import Awaitable, Dict, Iterator, Optional, Tuple, Union
67

78
import scrapy
@@ -104,6 +105,13 @@ async def _get_header_value(
104105
return None
105106

106107

108+
@dataclass
109+
class _QueueItem:
110+
coro: Awaitable
111+
promise: Deferred | asyncio.Future
112+
loop: asyncio.AbstractEventLoop | None = None
113+
114+
107115
class _ThreadedLoopAdapter:
108116
"""Utility class to start an asyncio event loop in a new thread and redirect coroutines.
109117
This allows to run Playwright in a different loop than the Scrapy crawler, allowing to
@@ -116,29 +124,54 @@ class _ThreadedLoopAdapter:
116124
_stop_events: Dict[int, asyncio.Event] = {}
117125

118126
@classmethod
119-
async def _handle_coro(cls, coro: Awaitable, dfd: Deferred) -> None:
127+
async def _handle_coro_deferred(cls, queue_item: _QueueItem) -> None:
120128
from twisted.internet import reactor
121129

130+
dfd: Deferred = queue_item.promise
131+
122132
try:
123-
result = await coro
133+
result = await queue_item.coro
124134
except Exception as exc:
125135
reactor.callFromThread(dfd.errback, failure.Failure(exc))
126136
else:
127137
reactor.callFromThread(dfd.callback, result)
128138

139+
@classmethod
140+
async def _handle_coro_future(cls, queue_item: _QueueItem) -> None:
141+
future: asyncio.Future = queue_item.promise
142+
assert queue_item.loop is not None # typing
143+
try:
144+
result = await queue_item.coro
145+
except Exception as exc:
146+
queue_item.loop.call_soon_threadsafe(future.set_exception, exc)
147+
else:
148+
queue_item.loop.call_soon_threadsafe(future.set_result, result)
149+
129150
@classmethod
130151
async def _process_queue(cls) -> None:
131152
while any(not ev.is_set() for ev in cls._stop_events.values()):
132-
coro, dfd = await cls._coro_queue.get()
133-
asyncio.create_task(cls._handle_coro(coro, dfd))
153+
queue_item = await cls._coro_queue.get()
154+
if isinstance(queue_item.promise, asyncio.Future):
155+
asyncio.create_task(cls._handle_coro_future(queue_item))
156+
elif isinstance(queue_item.promise, Deferred):
157+
asyncio.create_task(cls._handle_coro_deferred(queue_item))
134158
cls._coro_queue.task_done()
135159

136160
@classmethod
137161
def _deferred_from_coro(cls, coro) -> Deferred:
138162
dfd: Deferred = Deferred()
139-
asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, dfd)), cls._loop)
163+
queue_item = _QueueItem(coro=coro, promise=dfd)
164+
asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop)
140165
return dfd
141166

167+
@classmethod
168+
def _future_from_coro(cls, coro) -> asyncio.Future:
169+
target_loop = asyncio.get_running_loop() # Scrapy thread loop
170+
future: asyncio.Future = asyncio.Future()
171+
queue_item = _QueueItem(coro=coro, promise=future, loop=target_loop)
172+
asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop)
173+
return future
174+
142175
@classmethod
143176
def start(cls, download_handler_id: int) -> None:
144177
"""Start the event loop in a new thread if not already started.

scrapy_playwright/handler.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from scrapy.http.headers import Headers
3232
from scrapy.responsetypes import responsetypes
3333
from scrapy.settings import Settings
34-
from scrapy.utils.defer import deferred_from_coro, maybe_deferred_to_future
34+
from scrapy.utils.defer import deferred_from_coro
3535
from scrapy.utils.misc import load_object
3636
from scrapy.utils.reactor import verify_installed_reactor
3737
from twisted.internet.defer import Deferred, inlineCallbacks
@@ -144,18 +144,21 @@ def __init__(self, crawler: Crawler) -> None:
144144
verify_installed_reactor("twisted.internet.asyncioreactor.AsyncioSelectorReactor")
145145
if _SCRAPY_ASYNC_API:
146146
super().__init__(crawler=crawler)
147-
crawler.signals.connect(self._launch, signals.engine_started)
148147
else:
149148
super().__init__( # pylint: disable=unexpected-keyword-arg
150149
settings=crawler.settings, crawler=crawler
151150
)
152-
crawler.signals.connect(self._engine_started, signals.engine_started)
153151
self.stats = crawler.stats
154152
self.config = Config.from_settings(crawler.settings)
155153

156154
if self.config.use_threaded_loop:
157155
_ThreadedLoopAdapter.start(id(self))
158156

157+
if _SCRAPY_ASYNC_API:
158+
crawler.signals.connect(self._maybe_launch_in_thread, signals.engine_started)
159+
else:
160+
crawler.signals.connect(self._engine_started, signals.engine_started)
161+
159162
self.browser_launch_lock = asyncio.Lock()
160163
self.context_launch_lock = asyncio.Lock()
161164
self.context_wrappers: Dict[str, BrowserContextWrapper] = {}
@@ -186,10 +189,17 @@ def _deferred_from_coro(self, coro: Awaitable) -> Deferred:
186189
return _ThreadedLoopAdapter._deferred_from_coro(coro)
187190
return deferred_from_coro(coro)
188191

192+
def _future_from_coro(self, coro: Awaitable) -> asyncio.Future:
193+
if self.config.use_threaded_loop:
194+
return _ThreadedLoopAdapter._future_from_coro(coro)
195+
return asyncio.ensure_future(coro)
196+
189197
def _engine_started(self) -> Deferred:
190-
"""Launch the browser. Use the engine_started signal as it supports returning deferreds."""
191198
return self._deferred_from_coro(self._launch())
192199

200+
async def _maybe_launch_in_thread(self) -> None:
201+
await self._future_from_coro(self._launch())
202+
193203
async def _launch(self) -> None:
194204
"""Launch Playwright manager and configured startup context(s)."""
195205
logger.info("Starting download handler")
@@ -362,7 +372,7 @@ def _set_max_concurrent_context_count(self):
362372
async def close(self) -> None:
363373
logger.info("Closing download handler")
364374
await super().close()
365-
await self._close()
375+
await self._future_from_coro(self._close())
366376
if self.config.use_threaded_loop:
367377
_ThreadedLoopAdapter.stop(id(self))
368378

@@ -392,9 +402,8 @@ async def _close(self) -> None:
392402

393403
async def download_request(self, request: Request) -> Response:
394404
if request.meta.get("playwright"):
395-
return await maybe_deferred_to_future(
396-
self._deferred_from_coro(self._download_request(request, self._crawler.spider))
397-
)
405+
coro = self._download_request(request)
406+
return await self._future_from_coro(coro)
398407
return await super().download_request( # pylint: disable=no-value-for-parameter
399408
request
400409
)
@@ -410,7 +419,8 @@ def download_request( # type: ignore[misc] # pylint: disable=invalid-overridden
410419
request=request, spider=spider
411420
)
412421

413-
async def _download_request(self, request: Request, spider: Spider) -> Response:
422+
async def _download_request(self, request: Request, spider: Spider | None = None) -> Response:
423+
spider = spider or self._crawler.spider
414424
counter = 0
415425
while True:
416426
try:

0 commit comments

Comments
 (0)