Skip to content

Commit eafada2

Browse files
authored
Merge pull request #14 from realpython/asyncio-walkthrough
Companion materials for asyncio-walkthrough
2 parents bf724a9 + 2c36d80 commit eafada2

File tree

8 files changed

+374
-0
lines changed

8 files changed

+374
-0
lines changed

asyncio-walkthrough/areq.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#!/usr/bin/env python3
2+
# areq.py
3+
4+
"""Asynchronously get links embedded in multiple pages' HMTL."""
5+
6+
import asyncio
7+
import logging
8+
import re
9+
import sys
10+
from typing import IO
11+
import urllib.error
12+
import urllib.parse
13+
14+
import aiofiles
15+
import aiohttp
16+
from aiohttp import ClientSession
17+
18+
logging.basicConfig(
19+
format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
20+
level=logging.DEBUG,
21+
datefmt="%H:%M:%S",
22+
stream=sys.stderr,
23+
)
24+
logger = logging.getLogger("areq")
25+
logging.getLogger("chardet.charsetprober").disabled = True
26+
27+
HREF_RE = re.compile(r'href="(.*?)"')
28+
29+
30+
async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
31+
"""GET request wrapper to fetch page HTML.
32+
33+
kwargs are passed to `session.request()`.
34+
"""
35+
36+
# Don't do any try/except here. If either the request or reading
37+
# of bytes raises, let that be handled by caller.
38+
resp = await session.request(method="GET", url=url, **kwargs)
39+
resp.raise_for_status() # raise if status >= 400
40+
logger.info("Got response [%s] for URL: %s", resp.status, url)
41+
html = await resp.text() # For bytes: resp.read()
42+
43+
# Dont close session; let caller decide when to do that.
44+
return html
45+
46+
47+
async def parse(url: str, session: ClientSession, **kwargs) -> set:
48+
"""Find HREFs in the HTML of `url`."""
49+
found = set()
50+
try:
51+
html = await fetch_html(url=url, session=session, **kwargs)
52+
except (
53+
aiohttp.ClientError,
54+
aiohttp.http_exceptions.HttpProcessingError,
55+
) as e:
56+
logger.error(
57+
"aiohttp exception for %s [%s]: %s",
58+
url,
59+
getattr(e, "status", None),
60+
getattr(e, "message", None),
61+
)
62+
return found
63+
except Exception as e:
64+
# May be raised from other libraries, such as chardet or yarl.
65+
# logger.exception will show the full traceback.
66+
logger.exception(
67+
"Non-aiohttp exception occured: %s", getattr(e, "__dict__", {})
68+
)
69+
return found
70+
else:
71+
# This portion is not really async, but it is the request/response
72+
# IO cycle that eats the largest portion of time.
73+
for link in HREF_RE.findall(html):
74+
try:
75+
# Ensure we return an absolute path.
76+
abslink = urllib.parse.urljoin(url, link)
77+
except (urllib.error.URLError, ValueError):
78+
logger.exception("Error parsing URL: %s", link)
79+
pass
80+
else:
81+
found.add(abslink)
82+
logger.info("Found %d links for %s", len(found), url)
83+
return found
84+
85+
86+
async def write_one(file: IO, url: str, **kwargs) -> None:
87+
"""Write the found HREFs from `url` to `file`."""
88+
res = await parse(url=url, **kwargs)
89+
if not res:
90+
return None
91+
async with aiofiles.open(file, "a") as f:
92+
for p in res:
93+
await f.write(f"{url}\t{p}\n")
94+
logger.info("Wrote results for source URL: %s", url)
95+
96+
97+
async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
98+
"""Crawl & write concurrently to `file` for multiple `urls`."""
99+
async with ClientSession() as session:
100+
tasks = []
101+
for url in urls:
102+
tasks.append(
103+
write_one(file=file, url=url, session=session, **kwargs)
104+
)
105+
await asyncio.gather(*tasks) # see also: return_exceptions=True
106+
107+
108+
if __name__ == "__main__":
109+
import pathlib
110+
import sys
111+
112+
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
113+
here = pathlib.Path(__file__).parent
114+
115+
with open(here.joinpath("urls.txt")) as infile:
116+
urls = set(map(str.strip, infile))
117+
118+
# Header - just a single, initial row-write
119+
outpath = here.joinpath("foundurls.txt")
120+
with open(outpath, "w") as outfile:
121+
outfile.write("source_url\tparsed_url\n")
122+
123+
asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

asyncio-walkthrough/asyncq.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#!/usr/bin/env python3
2+
# asyncq.py
3+
4+
import asyncio
5+
import itertools as it
6+
import os
7+
import random
8+
import time
9+
10+
11+
async def makeitem(size: int = 5) -> str:
12+
return os.urandom(size).hex()
13+
14+
15+
async def seconds() -> float:
16+
return time.perf_counter()
17+
18+
19+
async def randint(a: int, b: int) -> int:
20+
return random.randint(a, b)
21+
22+
23+
async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
24+
i = await randint(a, b)
25+
if caller:
26+
print(f"{caller} sleeping for {i} seconds.")
27+
await asyncio.sleep(i)
28+
29+
30+
async def produce(name: int, q: asyncio.Queue) -> None:
31+
n = await randint(1, 5)
32+
for _ in it.repeat(None, n): # Synchronous
33+
await randsleep(caller=f"Producer {name}")
34+
i = await makeitem()
35+
t = await seconds()
36+
await q.put((i, t))
37+
print(f"Producer {name} added <{i}> to queue.")
38+
39+
40+
async def consume(name: int, q: asyncio.Queue) -> None:
41+
while True:
42+
await randsleep(caller=f"Consumer {name}")
43+
i, t = await q.get()
44+
now = await seconds()
45+
print(
46+
f"Consumer {name} got element <{i}>" f" in {now-t:0.5f} seconds."
47+
)
48+
q.task_done()
49+
50+
51+
async def main(nprod: int, ncon: int):
52+
q = asyncio.Queue()
53+
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
54+
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
55+
await asyncio.gather(*producers)
56+
await q.join()
57+
for c in consumers:
58+
c.cancel()
59+
60+
61+
if __name__ == "__main__":
62+
import argparse
63+
64+
random.seed(444)
65+
parser = argparse.ArgumentParser()
66+
parser.add_argument("-p", "--nprod", type=int, default=5)
67+
parser.add_argument("-c", "--ncon", type=int, default=10)
68+
ns = parser.parse_args()
69+
start = time.perf_counter()
70+
asyncio.run(main(**ns.__dict__))
71+
elapsed = time.perf_counter() - start
72+
print(f"Program completed in {elapsed:0.5f} seconds.")

asyncio-walkthrough/chained.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/usr/env/bin python3
2+
# chained.py
3+
4+
import asyncio
5+
import random
6+
import time
7+
8+
9+
async def randint(a: int, b: int) -> int:
10+
return random.randint(a, b)
11+
12+
13+
async def part1(n: int) -> str:
14+
i = await randint(0, 10)
15+
print(f"part1({n}) sleeping for {i} seconds.")
16+
await asyncio.sleep(i)
17+
result = f"result{n}-1"
18+
print(f"Returning part1({n}) == {result}.")
19+
return result
20+
21+
22+
async def part2(n: int, arg: str) -> str:
23+
i = await randint(0, 10)
24+
print(f"part2{n, arg} sleeping for {i} seconds.")
25+
await asyncio.sleep(i)
26+
result = f"result{n}-2 derived from {arg}"
27+
print(f"Returning part2{n, arg} == {result}.")
28+
return result
29+
30+
31+
async def chain(n: int) -> None:
32+
start = time.perf_counter()
33+
p1 = await part1(n)
34+
p2 = await part2(n, p1)
35+
end = time.perf_counter() - start
36+
print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")
37+
38+
39+
async def main(*args):
40+
await asyncio.gather(*(chain(n) for n in args))
41+
42+
43+
if __name__ == "__main__":
44+
import sys
45+
46+
random.seed(444)
47+
args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
48+
start = time.perf_counter()
49+
asyncio.run(main(*args))
50+
end = time.perf_counter() - start
51+
print(f"Program finished in {end:0.2f} seconds.")

asyncio-walkthrough/countasync.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/usr/bin/env python3
2+
# countasync.py
3+
4+
import asyncio
5+
6+
7+
async def count():
8+
print("One")
9+
await asyncio.sleep(1)
10+
print("Two")
11+
12+
13+
async def main():
14+
await asyncio.gather(count(), count(), count())
15+
16+
17+
if __name__ == "__main__":
18+
import time
19+
20+
s = time.perf_counter()
21+
asyncio.run(main())
22+
elapsed = time.perf_counter() - s
23+
print(f"{__file__} executed in {elapsed:0.2f} seconds.")

asyncio-walkthrough/countsync.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#!/usr/bin/env python3
2+
# countsync.py
3+
4+
import time
5+
6+
7+
def count():
8+
print("One")
9+
time.sleep(1)
10+
print("Two")
11+
12+
13+
def main():
14+
for _ in range(3):
15+
count()
16+
17+
18+
if __name__ == "__main__":
19+
s = time.perf_counter()
20+
main()
21+
elapsed = time.perf_counter() - s
22+
print(f"{__file__} executed in {elapsed:0.2f} seconds.")

asyncio-walkthrough/phases.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#!/usr/env/bin python3
2+
# phases.py
3+
4+
import asyncio
5+
6+
7+
async def phase1(callerid: str):
8+
print(f"phase 1 called from {callerid}")
9+
await asyncio.sleep(2)
10+
return "result1"
11+
12+
13+
async def phase2(callerid: str, arg: str):
14+
print(f"phase 2 called from {callerid}")
15+
await asyncio.sleep(2)
16+
# No await needed here - arg is passed from caller.
17+
return f"result2 derived from {arg}"
18+
19+
20+
async def outer(callerid: str):
21+
"""A wrapper for parameterizing a full coroutine."""
22+
print(f"outer called from {callerid}")
23+
r1 = await phase1(callerid)
24+
r2 = await phase2(callerid, r1)
25+
return r1, r2
26+
27+
28+
async def main():
29+
"""Wrap the coroutines into tasks and execute."""
30+
results = await asyncio.gather(*(outer(i) for i in "ABC"))
31+
return results
32+
33+
34+
if __name__ == "__main__":
35+
asyncio.run(main())

asyncio-walkthrough/rand.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/usr/bin/env python3
2+
# rand.py
3+
4+
import asyncio
5+
import random
6+
7+
# colors
8+
c = (
9+
"\033[0m", # end of color
10+
"\033[36m", # cyan
11+
"\033[91m", # red
12+
"\033[35m", # magenta
13+
)
14+
15+
16+
async def randint(a: int, b: int) -> int:
17+
return random.randint(a, b)
18+
19+
20+
async def makerandom(idx: int, threshold: int = 6) -> int:
21+
print(c[idx + 1] + f"Initiated makerandom({idx}).")
22+
i = await randint(0, 10)
23+
while i <= threshold:
24+
print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
25+
await asyncio.sleep(idx + 1)
26+
i = await randint(0, 10)
27+
print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
28+
return i
29+
30+
31+
async def main():
32+
res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
33+
return res
34+
35+
36+
if __name__ == "__main__":
37+
random.seed(444)
38+
r1, r2, r3 = asyncio.run(main())
39+
print()
40+
print(f"r1: {r1}, r2: {r2}, r3: {r3}")

asyncio-walkthrough/urls.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
https://regex101.com/
2+
https://docs.python.org/3/this-url-will-404.html
3+
https://www.nytimes.com/guides/
4+
https://www.mediamatters.org/
5+
https://1.1.1.1/
6+
https://www.politico.com/tipsheets/morning-money
7+
https://www.bloomberg.com/markets/economics
8+
https://www.ietf.org/rfc/rfc2616.txt

0 commit comments

Comments
 (0)