Skip to content

Commit c13edce

Browse files
committed
Refactor
1 parent 19a9651 commit c13edce

File tree

1 file changed

+83
-79
lines changed

1 file changed

+83
-79
lines changed

asyncio-walkthrough/areq.py

Lines changed: 83 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@
77
import logging
88
import re
99
import sys
10-
from typing import List, Tuple
10+
from typing import BinaryIO, Union
1111
import urllib.error
1212
import urllib.parse
1313

1414
import aiofiles
1515
import aiohttp
16+
from aiohttp import ClientSession, ClientTimeout
1617
from aiohttp.helpers import sentinel
1718

19+
1820
logging.basicConfig(
1921
format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
2022
level=logging.DEBUG,
@@ -25,118 +27,120 @@
2527
logging.getLogger("chardet.charsetprober").disabled = True
2628

2729
HREF_RE = re.compile(r'href="(.*?)"')
28-
DEFAULT_GET_TIMEOUT = aiohttp.ClientTimeout(total=10) # seconds
29-
3030

31-
class BadURLError(Exception):
32-
"""Blanket exception for a bad page."""
31+
# You can specify timeouts for both the session as a whole and
32+
# for individual requests.
33+
#
34+
# https://aiohttp.readthedocs.io/en/stable/client_quickstart.html#timeouts
35+
DEFAULT_GET_TIMEOUT = ClientTimeout(total=8) # seconds
3336

34-
pass
3537

36-
37-
async def get(
38+
async def fetch_html(
3839
url: str,
39-
session: aiohttp.ClientSession,
40-
timeout: aiohttp.ClientTimeout = DEFAULT_GET_TIMEOUT,
40+
session: ClientSession,
41+
timeout: ClientTimeout,
4142
**kwargs,
42-
) -> aiohttp.ClientResponse:
43-
"""GET request; return only valid responses.
43+
) -> str:
44+
"""GET request wrapper to fetch page HTML.
4445
4546
kwargs are passed to `session.request()`.
4647
"""
4748

48-
# We only want to return valid responses. This means the request
49-
# itself was made successfully *and* status code is good.
50-
try:
51-
resp = await session.request(
52-
method="GET", url=url, timeout=timeout, **kwargs
53-
)
54-
except Exception as e:
55-
logger.exception("Problem getting response for URL: %s", url)
56-
raise BadURLError("Failed request") from e
57-
try:
58-
resp.raise_for_status()
59-
except Exception as e:
60-
logger.error("Bad status [%s] for URL: %s", resp.status, url)
61-
raise BadURLError("Bad status") from e
62-
else:
63-
logger.info("Got response [%s] for URL: %s", resp.status, url)
64-
# Dont close session; let caller decide when to do that.
65-
return resp
49+
# Don't do any try/except here. If either the request or reading
50+
# of bytes raises, let that be handled by caller.
51+
resp = await session.request(
52+
method="GET", url=url, timeout=timeout, **kwargs
53+
)
54+
resp.raise_for_status() # raise if status >= 400
55+
logger.info("Got response [%s] for URL: %s", resp.status, url)
56+
html = await resp.text() # For bytes: resp.read()
57+
58+
# Dont close session; let caller decide when to do that.
59+
return html
6660

6761

6862
async def parse(
6963
url: str,
70-
session: aiohttp.ClientSession,
71-
timeout: aiohttp.ClientTimeout = DEFAULT_GET_TIMEOUT,
64+
session: ClientSession,
65+
timeout: ClientTimeout = DEFAULT_GET_TIMEOUT,
7266
**kwargs,
73-
) -> Tuple[str, set]:
74-
# Pass through the URL so that the end caller knows which URLs map
75-
# to which.
76-
res = set()
67+
) -> set:
68+
"""Find HREFs in the HTML of `url`."""
69+
found = set()
7770
try:
78-
resp = await get(url=url, session=session, timeout=timeout, **kwargs)
79-
except BadURLError as e:
80-
return url, res
81-
try:
82-
# This effectively functions like a "try-except-else"
83-
html = await resp.text()
84-
except: # noqa
85-
logger.exception("Problem getting text for URL: %s", url)
86-
return url, res
71+
html = await fetch_html(url=url, session=session, timeout=timeout,
72+
**kwargs)
73+
except (
74+
aiohttp.ClientError,
75+
aiohttp.http_exceptions.HttpProcessingError
76+
) as e:
77+
logger.error(
78+
'aiohttp exception for %s [%s]: %s',
79+
url, getattr(e, 'status', None), getattr(e, 'message', None)
80+
)
81+
return found
82+
except Exception as e:
83+
# May be raised from other libraries, such as chardet or yarl.
84+
# logger.exception will show the full traceback.
85+
logger.exception('Non-aiohttp exception occured: %s',
86+
getattr(e, '__dict__', {}))
87+
return found
8788
else:
88-
if not html:
89-
return url, res
90-
91-
# This portion is not really async, but it is the request/response
92-
# IO cycle that eats the largest portion of time.
93-
for link in HREF_RE.findall(html):
94-
try:
95-
# Ensure we return an absolute path.
96-
abslink = urllib.parse.urljoin(url, link)
97-
except (urllib.error.URLError, ValueError):
98-
logger.exception("Error parsing URL: %s", link)
99-
pass
100-
else:
101-
res.add(abslink)
102-
logger.info("Found %d links for %s", len(res), url)
103-
return url, res
104-
105-
106-
async def write(file, url: str, **kwargs) -> None:
107-
_, res = await parse(url=url, **kwargs)
89+
# This portion is not really async, but it is the request/response
90+
# IO cycle that eats the largest portion of time.
91+
for link in HREF_RE.findall(html):
92+
try:
93+
# Ensure we return an absolute path.
94+
abslink = urllib.parse.urljoin(url, link)
95+
except (urllib.error.URLError, ValueError):
96+
logger.exception("Error parsing URL: %s", link)
97+
pass
98+
else:
99+
found.add(abslink)
100+
logger.info("Found %d links for %s", len(found), url)
101+
return found
102+
103+
104+
async def write_one(file: BinaryIO, url: str, **kwargs) -> None:
105+
"""Write the found HREFs from `url` to `file`."""
106+
res = await parse(url=url, **kwargs)
107+
if not res:
108+
return None
108109
async with aiofiles.open(file, "a") as f:
109110
for p in res:
110111
await f.write(f"{url}\t{p}\n")
111112
logger.info("Wrote results for source URL: %s", url)
112113

113114

114-
async def bulk_get_and_write(
115-
file, urls: set, session_timeout=None, **kwargs
116-
) -> List[Tuple[str, set]]:
117-
if session_timeout is not sentinel:
118-
# A conservative timeout estimate is 10 seconds per URL.
119-
session_timeout = aiohttp.ClientTimeout(total=10 * len(urls))
120-
async with aiohttp.ClientSession(timeout=session_timeout) as session:
115+
async def bulk_crawl_and_write(
116+
file: BinaryIO,
117+
urls: set,
118+
timeout: Union[object, ClientTimeout] = sentinel,
119+
**kwargs
120+
) -> None:
121+
"""Crawl & write concurrently to `file` for multiple `urls`."""
122+
async with ClientSession() as session:
121123
tasks = []
122124
for url in urls:
123-
tasks.append(write(file=file, url=url, session=session, **kwargs))
124-
await asyncio.gather(*tasks, return_exceptions=True)
125+
tasks.append(
126+
write_one(file=file, url=url, session=session, **kwargs)
127+
)
128+
await asyncio.gather(*tasks) # see also: return_exceptions=True
125129

126130

127131
if __name__ == "__main__":
128132
import pathlib
133+
import sys
129134

135+
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
130136
here = pathlib.Path(__file__).parent
131137

132-
urls = set()
133138
with open(here.joinpath("urls.txt")) as infile:
134-
for i in infile:
135-
urls.add(i.strip())
139+
urls = set(map(str.strip, infile))
136140

137-
# Header - just a single row-write
141+
# Header - just a single, initial row-write
138142
outpath = here.joinpath("foundurls.txt")
139143
with open(outpath, "w") as outfile:
140144
outfile.write("source_url\tparsed_url\n")
141145

142-
asyncio.run(bulk_get_and_write(file=outpath, urls=urls))
146+
asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

0 commit comments

Comments
 (0)