|
1 | 1 | from datetime import date, datetime, timezone, timedelta |
2 | | -from io import BytesIO |
3 | 2 | from itertools import islice |
4 | 3 | from pathlib import Path |
5 | 4 | import time |
6 | 5 | import pytest |
7 | 6 | from unittest import mock |
8 | | -from urllib.parse import urlparse, ParseResult, parse_qs |
9 | | -from urllib3 import (HTTPConnectionPool, |
10 | | - HTTPResponse, |
| 7 | +from urllib3 import (HTTPResponse, |
11 | 8 | Timeout as Urllib3Timeout) |
12 | 9 | # The Header dict lives in a different place for urllib3 v2: |
13 | 10 | try: |
@@ -229,79 +226,6 @@ def test_search_with_filter_tuple(): |
229 | 226 | assert all(('feature' in v.url for v in versions)) |
230 | 227 |
|
231 | 228 |
|
232 | | -class Urllib3MockManager: |
233 | | - def __init__(self) -> None: |
234 | | - self.responses = [] |
235 | | - |
236 | | - def get(self, url, responses) -> None: |
237 | | - url_info = urlparse(url) |
238 | | - if url_info.path == '': |
239 | | - url_info = url_info._replace(path='/') |
240 | | - for index, response in enumerate(responses): |
241 | | - repeat = True if index == len(responses) - 1 else False |
242 | | - self.responses.append(('GET', url_info, response, repeat)) |
243 | | - |
244 | | - def _compare_querystrings(self, actual, candidate): |
245 | | - for k, v in candidate.items(): |
246 | | - if k not in actual or actual[k] != v: |
247 | | - return False |
248 | | - return True |
249 | | - |
250 | | - def urlopen(self, pool: HTTPConnectionPool, method, url, *args, preload_content: bool = True, **kwargs): |
251 | | - opened_url = urlparse(url) |
252 | | - opened_path = opened_url.path or '/' |
253 | | - opened_query = parse_qs(opened_url.query) |
254 | | - for index, candidate in enumerate(self.responses): |
255 | | - candidate_url: ParseResult = candidate[1] |
256 | | - if ( |
257 | | - method == candidate[0] |
258 | | - and (not candidate_url.scheme or candidate_url.scheme == pool.scheme) |
259 | | - and (not candidate_url.hostname or candidate_url.hostname == pool.host) |
260 | | - and (not candidate_url.port or candidate_url.port == pool.port) |
261 | | - and candidate_url.path == opened_path |
262 | | - # This is cheap, ideally we'd parse the querystrings. |
263 | | - # and parse_qs(candidate_url.query) == opened_query |
264 | | - and self._compare_querystrings(opened_query, parse_qs(candidate_url.query)) |
265 | | - ): |
266 | | - if not candidate[3]: |
267 | | - self.responses.pop(index) |
268 | | - |
269 | | - data = candidate[2] |
270 | | - if data.get('exc'): |
271 | | - raise data['exc']() |
272 | | - |
273 | | - content = data.get('content') |
274 | | - if content is None: |
275 | | - content = data.get('text', '').encode() |
276 | | - |
277 | | - return HTTPResponse( |
278 | | - body=BytesIO(content), |
279 | | - headers=HTTPHeaderDict(data.get('headers', {})), |
280 | | - status=data.get('status_code', 200), |
281 | | - decode_content=False, |
282 | | - preload_content=preload_content, |
283 | | - ) |
284 | | - |
285 | | - # No matches! |
286 | | - raise RuntimeError( |
287 | | - f"No HTTP mocks matched {method} {pool.scheme}://{pool.host}{url}" |
288 | | - ) |
289 | | - |
290 | | - |
291 | | -@pytest.fixture |
292 | | -def urllib3_mock(monkeypatch): |
293 | | - manager = Urllib3MockManager() |
294 | | - |
295 | | - def urlopen_mock(self, method, url, *args, preload_content: bool = True, **kwargs): |
296 | | - return manager.urlopen(self, method, url, *args, preload_content=preload_content, **kwargs) |
297 | | - |
298 | | - monkeypatch.setattr( |
299 | | - "urllib3.connectionpool.HTTPConnectionPool.urlopen", urlopen_mock |
300 | | - ) |
301 | | - |
302 | | - return manager |
303 | | - |
304 | | - |
305 | 229 | def test_search_removes_malformed_entries(urllib3_mock): |
306 | 230 | """ |
307 | 231 | The CDX index contains many lines for things that can't actually be |
|
0 commit comments