1+ import mimetypes
2+ import urllib .request
3+ from email .message import Message
4+ from pathlib import Path
5+ from urllib .request import Request
6+
7+ import requests
8+ from requests import Response
9+
10+ _original_urlopen = urllib .request .urlopen
11+ _original_requests_session_request = requests .Session .request
12+
13+ _url_mappings : dict [str , Path ] = {}
14+ _mocked = False
15+
16+
17+ class MockHTTPResponse :
18+ def __init__ (self , url , content , status = 200 , headers : dict [str , str ] | None = None ):
19+ self .url = url
20+ self ._content = content .encode ('utf-8' ) if isinstance (content , str ) else content
21+ self ._status = status
22+ self .headers = Message ()
23+ mime_type = mimetypes .guess_type (url )[0 ]
24+
25+ if mime_type is not None :
26+ self .headers .add_header ('Content-Type' , mime_type )
27+ if isinstance (headers , dict ):
28+ for h , v in headers .items ():
29+ self .headers .add_header (h , v )
30+
31+ def read (self ):
32+ return self ._content
33+
34+ def getcode (self ):
35+ return self ._status
36+
37+ def geturl (self ):
38+ return self .url
39+
40+ def info (self ):
41+ return self .headers
42+
43+ def __enter__ (self ):
44+ return self
45+
46+ def __exit__ (self , * args ):
47+ pass
48+
49+
50+ class MockRequestsResponse (Response ):
51+ def __init__ (self , url , content , status_code = 200 ):
52+ super ().__init__ ()
53+ self .url = url
54+ self .status_code = status_code
55+ self ._content = content
56+ mime_type = mimetypes .guess_type (url )[0 ]
57+ if mime_type is not None :
58+ self .headers ['Content-Type' ] = mime_type
59+
60+
61+ def load_content (url : str ):
62+ url_mapping , local_path = None , None
63+ for um , lp in _url_mappings .items ():
64+ if url .startswith (um ):
65+ url_mapping = um
66+ local_path = lp
67+ break
68+ if not url_mapping :
69+ return None
70+
71+ rel_path = url [len (url_mapping ):]
72+ if rel_path .startswith ('/' ):
73+ rel_path = rel_path [1 :]
74+ local_file = local_path / rel_path
75+ if not local_file .exists ():
76+ raise IOError (f'Local file { local_file } for URL { url } from mapping { url_mapping } does not exist' )
77+ with open (local_file , 'rb' ) as f :
78+ return f .read ()
79+
80+
81+ def enable (url_mappings : dict [str , str | Path ] | None = None ):
82+ global _url_mappings , _mocked
83+ if url_mappings is None :
84+ _url_mappings .clear ()
85+ else :
86+ _url_mappings = {k : Path (v ) for k , v in url_mappings .items ()}
87+
88+ if not _mocked :
89+
90+ def mock_urlopen (request : str | Request , * args , ** kwargs ):
91+ url = request if not isinstance (request , Request ) else request .full_url
92+ content = load_content (url )
93+ if content is not None :
94+ return MockHTTPResponse (url = url , content = content )
95+ else :
96+ return _original_urlopen (request , * args , ** kwargs )
97+
98+ def mock_requests_session_requests (self , method , url , * args , ** kwargs ):
99+ content = load_content (url )
100+ if content is not None :
101+ return MockRequestsResponse (url = url , content = content )
102+ else :
103+ return _original_requests_session_request (self , method , url , * args , ** kwargs )
104+
105+ urllib .request .urlopen = mock_urlopen
106+ requests .Session .request = mock_requests_session_requests
107+ _mocked = True
108+
109+
110+ def disable ():
111+ urllib .request .urlopen = _original_urlopen
112+ requests .Session .request = _original_requests_session_request
113+
114+
115+ # Enable with empty mappings to override elements from the moment we are imported
116+ enable ()
0 commit comments