-
Notifications
You must be signed in to change notification settings - Fork 158
Expand file tree
/
Copy path_utils.py
More file actions
202 lines (168 loc) · 6.97 KB
/
_utils.py
File metadata and controls
202 lines (168 loc) · 6.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import asyncio
import logging
import platform
import threading
from dataclasses import dataclass
from typing import Awaitable, Dict, Iterator, Optional, Tuple, Union
import scrapy
from playwright.async_api import Error, Page, Request, Response
from scrapy.http.headers import Headers
from scrapy.settings import Settings
from scrapy.utils.python import to_unicode
from twisted.internet.defer import Deferred
from twisted.python import failure
from w3lib.encoding import html_body_declared_encoding, http_content_type_encoding
logger = logging.getLogger("scrapy-playwright")
async def _maybe_await(obj):
if isinstance(obj, Awaitable):
return await obj
return obj
def _possible_encodings(headers: Headers, text: str) -> Iterator[str]:
if headers.get("content-type"):
content_type = to_unicode(headers["content-type"])
yield http_content_type_encoding(content_type)
yield html_body_declared_encoding(text)
def _encode_body(headers: Headers, text: str) -> Tuple[bytes, str]:
for encoding in filter(None, _possible_encodings(headers, text)):
try:
body = text.encode(encoding)
except UnicodeEncodeError:
pass
else:
return body, encoding
return text.encode("utf-8"), "utf-8" # fallback
def _is_safe_close_error(error: Error) -> bool:
"""
Taken almost verbatim from
https://github.com/microsoft/playwright-python/blob/v1.20.0/playwright/_impl/_helper.py#L234-L238
"""
message = str(error)
return message.endswith("Browser has been closed") or message.endswith(
"Target page, context or browser has been closed"
)
_NAVIGATION_ERROR_MSG = (
"Unable to retrieve content because the page is navigating and changing the content."
)
async def _get_page_content(
page: Page,
spider: scrapy.Spider,
context_name: str,
scrapy_request_url: str,
scrapy_request_method: str,
) -> str:
"""Wrapper around Page.content to retry if necessary.
Arguments other than the page are only for logging.
"""
try:
return await page.content()
except Error as err:
if _NAVIGATION_ERROR_MSG in err.message:
logger.debug(
"Retrying to get content from page '%s', error: '%s'",
page.url,
_NAVIGATION_ERROR_MSG,
extra={
"spider": spider,
"context_name": context_name,
"scrapy_request_url": scrapy_request_url,
"scrapy_request_method": scrapy_request_method,
"playwright_page_url": page.url,
},
)
return await page.content()
raise
def _get_float_setting(settings: Settings, key: str) -> Optional[float]:
try:
return float(settings[key])
except Exception:
return None
async def _get_header_value(
resource: Union[Request, Response],
header_name: str,
) -> Optional[str]:
try:
return await resource.header_value(header_name)
except Exception:
return None
@dataclass
class _QueueItem:
coro: Awaitable
promise: Deferred | asyncio.Future
loop: asyncio.AbstractEventLoop | None = None
class _ThreadedLoopAdapter:
"""Utility class to start an asyncio event loop in a new thread and redirect coroutines.
This allows to run Playwright in a different loop than the Scrapy crawler, allowing to
use ProactorEventLoop which is supported by Playwright on Windows.
"""
_loop: asyncio.AbstractEventLoop
_thread: threading.Thread
_coro_queue: asyncio.Queue = asyncio.Queue()
_stop_events: Dict[int, asyncio.Event] = {}
@classmethod
async def _handle_coro_deferred(cls, queue_item: _QueueItem) -> None:
from twisted.internet import reactor
dfd: Deferred = queue_item.promise
try:
result = await queue_item.coro
except Exception as exc:
reactor.callFromThread(dfd.errback, failure.Failure(exc))
else:
reactor.callFromThread(dfd.callback, result)
@classmethod
async def _handle_coro_future(cls, queue_item: _QueueItem) -> None:
future: asyncio.Future = queue_item.promise
loop: asyncio.AbstractEventLoop = queue_item.loop # type: ignore[assignment]
try:
result = await queue_item.coro
except Exception as exc:
loop.call_soon_threadsafe(future.set_exception, exc)
else:
loop.call_soon_threadsafe(future.set_result, result)
@classmethod
async def _process_queue(cls) -> None:
while any(not ev.is_set() for ev in cls._stop_events.values()):
queue_item = await cls._coro_queue.get()
if isinstance(queue_item.promise, asyncio.Future):
asyncio.create_task(cls._handle_coro_future(queue_item))
elif isinstance(queue_item.promise, Deferred):
asyncio.create_task(cls._handle_coro_deferred(queue_item))
cls._coro_queue.task_done()
@classmethod
def _deferred_from_coro(cls, coro: Awaitable) -> Deferred:
dfd: Deferred = Deferred()
queue_item = _QueueItem(coro=coro, promise=dfd)
asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop)
return dfd
@classmethod
def _future_from_coro(cls, coro: Awaitable) -> asyncio.Future:
target_loop = asyncio.get_running_loop() # Scrapy thread loop
future: asyncio.Future = asyncio.Future()
queue_item = _QueueItem(coro=coro, promise=future, loop=target_loop)
asyncio.run_coroutine_threadsafe(cls._coro_queue.put(queue_item), cls._loop)
return future
@classmethod
def start(cls, download_handler_id: int) -> None:
"""Start the event loop in a new thread if not already started.
Should be called from the Scrapy thread.
"""
cls._stop_events[download_handler_id] = asyncio.Event()
if not getattr(cls, "_loop", None):
policy = asyncio.DefaultEventLoopPolicy()
if platform.system() == "Windows":
policy = asyncio.WindowsProactorEventLoopPolicy() # type: ignore[attr-defined]
cls._loop = policy.new_event_loop()
if not getattr(cls, "_thread", None):
cls._thread = threading.Thread(target=cls._loop.run_forever, daemon=True)
cls._thread.start()
logger.info("Started loop on separate thread: %s", cls._loop)
asyncio.run_coroutine_threadsafe(cls._process_queue(), cls._loop)
@classmethod
def stop(cls, download_handler_id: int) -> None:
"""Wait until all handlers are closed to stop the event loop and join the thread.
Should be called from the Scrapy thread.
"""
cls._stop_events[download_handler_id].set()
if all(ev.is_set() for ev in cls._stop_events.values()):
asyncio.run_coroutine_threadsafe(cls._coro_queue.join(), cls._loop)
cls._loop.call_soon_threadsafe(cls._loop.stop)
cls._thread.join()