|
| 1 | ++++ |
| 2 | +date = '2025-11-20T8:00:00+08:00' |
| 3 | +draft = false |
| 4 | +title = 'Python Asyncio 02: Asyncio Basics Part 1' |
| 5 | +tags = ['Python', 'Asyncio'] |
| 6 | ++++ |
| 7 | + |
| 8 | +### Introducing coroutines |
| 9 | + |
| 10 | +创建一个协程 coroutine 和创建一个函数很类型,使用 `async def` 关键字,而不是 `def`: |
| 11 | + |
| 12 | +```Python |
| 13 | +async def coroutine_add_one(number: int) -> int: |
| 14 | + return number + 1 |
| 15 | + |
| 16 | +def add_one(number: int) -> int: |
| 17 | + return number + 1 |
| 18 | + |
| 19 | +function_result = add_one(1) |
| 20 | +coroutine_result = coroutine_add_one(1) |
| 21 | + |
| 22 | +print(f"Function result is {function_result} and the type is {type(function_result)}") |
| 23 | +print(f"Coroutine result is {coroutine_result} and the type is {type(coroutine_result)}") |
| 24 | +``` |
| 25 | + |
| 26 | +输出如下 |
| 27 | + |
| 28 | +```text |
| 29 | +Function result is 2 and the type is <class 'int'> |
| 30 | +Coroutine result is <coroutine object coroutine_add_one at 0x103000a00> and the type is <class 'coroutine'> |
| 31 | +``` |
| 32 | + |
| 33 | +可以看到,协程返回的不是值,而是一个协程对象。 |
| 34 | +这里协程并没有执行,而是创建了一个协程对象可在之后运行,要运行一个协程则必须显式地在一个事件循环中运行它。 |
| 35 | +在 Python 3.7 之后的版本,必须创建事件循环来运行它。 |
| 36 | +asyncio 库添加了多个函数,抽象了事件循环的管理,例如 `asyncio.run()`,可以使用它来运行协程: |
| 37 | + |
| 38 | +```Python |
| 39 | +coroutine_result = asyncio.run(coroutine_add_one(1)) |
| 40 | +``` |
| 41 | + |
| 42 | +`asyncio.run()` 在此场景下运行了多个重要的事情。 |
| 43 | +首先,它创建了一个全新的时间循环,一但成功创建,就会去运行所有的协程直到结束,并返回结果。 |
| 44 | +该函数还会在主协程完成后清理任何可能仍在运行的内容。 |
| 45 | +等运行结束后,它会关闭并终止事件循环。 |
| 46 | + |
| 47 | +关于 `asyncio.run()` 最重要的是,它期望成为程序的主入口,它只执行一个协程。 |
| 48 | + |
| 49 | +使用 `await` 关键字会让后面的协程运行,不同于普通的函数调用,`await` 关键字可以暂停协程并获取到结果,而不是一个协程对象。 |
| 50 | + |
| 51 | +例如下面这样: |
| 52 | + |
| 53 | +```Python |
| 54 | +import asyncio |
| 55 | + |
| 56 | +async def add_one(number: int) -> int: |
| 57 | + return number + 1 |
| 58 | + |
| 59 | +async def main() -> None: |
| 60 | + one_plus_one = await add_one(1) # 暂停并等待 add_one(1) 结果 |
| 61 | + two_plus_one = await add_one(2) # 暂停并等待 add_one(2) 结果 |
| 62 | + print(one_plus_one) |
| 63 | + print(two_plus_one) |
| 64 | + |
| 65 | +asyncio.run(main()) |
| 66 | +``` |
| 67 | + |
| 68 | +### Introducing long-running coroutines with sleep |
| 69 | + |
| 70 | +首先创建一个通用的 `delay()` 函数,这里在 `asynco.sleep()` 前后分别输出以观察暂停协程的时候,是否有其他代码在运行。 |
| 71 | + |
| 72 | +```Python |
| 73 | +import asyncio |
| 74 | + |
| 75 | +async def delay(delay_seconds: int) -> int: |
| 76 | + print(f"sleeping for {delay_seconds} second(s)") |
| 77 | + await asyncio.sleep(delay_seconds) |
| 78 | + print(f"finished sleeping for {delay_seconds} second(s)") |
| 79 | + return delay_seconds |
| 80 | +``` |
| 81 | + |
| 82 | +现在将这个函数打包到 `util/delay_functions.py` 里面 |
| 83 | + |
| 84 | +```python |
| 85 | +import asyncio |
| 86 | +from util.delay_functions import delay |
| 87 | + |
| 88 | +async def add_one(number: int) -> int: |
| 89 | + return number + 1 |
| 90 | + |
| 91 | +async def hello_world_message() -> str: |
| 92 | + await delay(1) |
| 93 | + return "Hello World!" |
| 94 | + |
| 95 | +async def main() -> None: |
| 96 | + message = await hello_world_message() # 1: Pause main until hello_world_message() returns |
| 97 | + one_plus_one = await add_one(1) # 2: Pause main until add_one() returns |
| 98 | + print(one_plus_one) |
| 99 | + print(message) |
| 100 | + |
| 101 | +asyncio.run(main()) |
| 102 | +``` |
| 103 | + |
| 104 | +上面这份代码实际上仍然会顺序运行,先暂停一秒,然后再加 1,因为 `await` 会完全暂停主函数。 |
| 105 | + |
| 106 | +如果想要暂停和加 1 并发运行,需要使用 tasks。 |
| 107 | + |
| 108 | +--- |
| 109 | + |
| 110 | +Tasks 是协程的 wrappers 他们会尽可能快地将协程调度到事件循环中去。 |
| 111 | +这种调度与执行以非阻塞方式进行,意味着一但创建任务,我们就可以在任务运行的同时执行任意其他代码。 |
| 112 | +使用 `await` 关键字的方法会阻塞代码,这意味着在获取返回结果前,该协程将完全被暂停。 |
| 113 | + |
| 114 | +下面创建两个任务 |
| 115 | + |
| 116 | +```Python |
| 117 | +import asyncio |
| 118 | +from util.delay_functions import delay |
| 119 | + |
| 120 | +async def main(): |
| 121 | + sleep_for_three = asyncio.create_task(delay(3)) |
| 122 | + print(type(sleep_for_three)) |
| 123 | + result = await sleep_for_three |
| 124 | + print(result) |
| 125 | + |
| 126 | +asyncio.run(main()) |
| 127 | +``` |
| 128 | + |
| 129 | +输出如下 |
| 130 | + |
| 131 | +``` |
| 132 | +<class '_asyncio.Task'> |
| 133 | +sleeping for 3 second(s)! |
| 134 | +finished sleeping for 3 second(s) |
| 135 | +3 |
| 136 | +``` |
| 137 | + |
| 138 | +1. task 类型是 `_asyncio.Task` 和 `coroutine` 不同 |
| 139 | +2. print 语句立刻就输出了 sleeping ...,如果使用 `await` 则要等 3 秒才会显示。 |
| 140 | +3. 在程序的运行环节中,应使用 `await` 关键字,如果不使用,虽然任务会运行,但是当 `asyncio.run` 结束的时候,任务会被立刻清理。 |
| 141 | + |
| 142 | +### Running multiple tasks concurrently |
| 143 | + |
| 144 | +考虑到 tasks 被创建并安排后会尽可能快地开始运行,这将允许同时并发运行多个 tasks。 |
| 145 | + |
| 146 | +```Python |
| 147 | +import asyncio |
| 148 | +from util.delay_functions import delay |
| 149 | + |
| 150 | + |
| 151 | +async def main(): |
| 152 | + sleep_for_three = asyncio.create_task(delay(3)) |
| 153 | + sleep_again = asyncio.create_task(delay(3)) |
| 154 | + sleep_once_more = asyncio.create_task(delay(3)) |
| 155 | + |
| 156 | + await sleep_for_three |
| 157 | + await sleep_again |
| 158 | + await sleep_once_more |
| 159 | + |
| 160 | +asyncio.run(main()) |
| 161 | +``` |
| 162 | + |
| 163 | +`asyncio.create_task()` 会启动协程并立即让事件循环调度它,这里的 `await` 做的只是等待 task 完成,在等待其间,事件循环仍然会继续运行。 |
| 164 | +上面代码一共会消耗 3 秒的时间,如果启动 10 个任务,也只会消耗 3 秒多时间,这就比顺序执行快了 n 倍! |
| 165 | + |
| 166 | +并且不止于此,在等待的时间里,还可以执行其他代码,例如下面每秒输出状态消息: |
| 167 | + |
| 168 | +```Python |
| 169 | +import asyncio |
| 170 | +from util.delay_functions import delay |
| 171 | + |
| 172 | + |
| 173 | +async def hello_every_second(): |
| 174 | + for _ in range(2): |
| 175 | + await asyncio.sleep(1) |
| 176 | + print("I'm running other code while I'm waiting!") |
| 177 | + |
| 178 | + |
| 179 | +async def main(): |
| 180 | + first_delay = asyncio.create_task(delay(3)) |
| 181 | + second_delay = asyncio.create_task(delay(3)) |
| 182 | + await hello_every_second() |
| 183 | + await first_delay |
| 184 | + await second_delay |
| 185 | + |
| 186 | +asyncio.run(main()) |
| 187 | +``` |
| 188 | + |
| 189 | +输出这样: |
| 190 | + |
| 191 | +```text |
| 192 | +sleeping for 3 second(s)! |
| 193 | +sleeping for 3 second(s)! |
| 194 | +I'm running other code while I'm waiting! |
| 195 | +I'm running other code while I'm waiting! |
| 196 | +finished sleeping for 3 second(s) |
| 197 | +finished sleeping for 3 second(s) |
| 198 | +``` |
| 199 | + |
| 200 | +上面存在的一种问题是,这些任务可能消耗不确定长的时间完成。 |
| 201 | +我们可能希望停止运行时间过长的任务,这可以通过取消来实现。 |
| 202 | + |
| 203 | +### Canceling tasks and setting tiemouts |
| 204 | + |
| 205 | +网络连接是不可靠的,用户的网速也可能下降,web 服务器可能崩溃使得请求不可达。 |
| 206 | +在上面的例子中,一个 task 可能永远也不会结束,程序可能卡在 `await` 语句等待不存在的回复。 |
| 207 | + |
| 208 | +Task 对象都有一个 `cancel` 方法,任何时候想要停止任务就可以调用该方法。 |
| 209 | +取消 task 会导致一个 `CancelledError` 的报错,当 `await` 任务的时候,必须要能处理该错误。 |
| 210 | + |
| 211 | +下面假设最多运行 5 秒任务: |
| 212 | + |
| 213 | +```Python |
| 214 | +import asyncio |
| 215 | +from asyncio import CancelledError |
| 216 | +from util.delay_functions import delay |
| 217 | + |
| 218 | +async def main(): |
| 219 | + long_task = asyncio.create_task(delay(10)) |
| 220 | + seconds_elapsed = 0 |
| 221 | + |
| 222 | + while not long_task.done(): |
| 223 | + print("Task not finished, checking again in a second.") |
| 224 | + await asyncio.sleep(1) |
| 225 | + seconds_elapsed = seconds_elapsed + 1 |
| 226 | + if seconds_elapsed == 5: |
| 227 | + long_task.cancel() |
| 228 | + try: |
| 229 | + await long_task |
| 230 | + except CancelledError: |
| 231 | + print("Our task was cancelled.") |
| 232 | + |
| 233 | +asyncio.run(main()) |
| 234 | +``` |
| 235 | + |
| 236 | +`done` 方法返回 bool 任务是否已经完成,还有很重要的一点就是,`CancelledError` 这个报错只能在 `await` 中抛出。 |
| 237 | +也就是说,调用 `cancel` 并不会立即停止该任务,只有在下一次 `await` 时才真正停止任务。 |
| 238 | + |
| 239 | +实际上,asyncio 提供了 `wait_for` 函数,该函数接受一个协程 coroutine 或任务 task,以及一个超时时间 timeout, 单位为秒。 |
| 240 | +如果时间超过了预设的 timeout,则会抛出一个 `TimeoutException`,超过阀值 threshold 后任务会自动取消。 |
| 241 | + |
| 242 | +下面例子中,有个运行两秒的例子,但是超时时间为 1 秒: |
| 243 | + |
| 244 | +```Python |
| 245 | +import asyncio |
| 246 | +from util.delay_functions import delay |
| 247 | + |
| 248 | + |
| 249 | +async def main(): |
| 250 | + delay_task = asyncio.create_task(delay(2)) |
| 251 | + try: |
| 252 | + result = await asyncio.wait_for(delay_task, timeout=1) |
| 253 | + print(result) |
| 254 | + except asyncio.exceptions.TimeoutError: |
| 255 | + print("Got a timeout.") |
| 256 | + print(f"Was the task cancelled? {delay_task.cancelled()}") |
| 257 | + |
| 258 | +asyncio.run(main()) |
| 259 | +``` |
| 260 | + |
| 261 | +输出超时信息 |
| 262 | + |
| 263 | +```text |
| 264 | +sleeping for 2 second(s)! |
| 265 | +Got a timeout. |
| 266 | +Was the task cancelled? True |
| 267 | +``` |
| 268 | + |
| 269 | +有时候我们可能并不想直接取消任务,而是当超时后通知用户,这个可以通过 `asyncio.shield` 将任务包装起来实现。 |
| 270 | +该函数会避免传入的协程被取消,提供给它一个 "shield" 从而忽略掉取消请求。 |
| 271 | + |
| 272 | +```Python |
| 273 | +import asyncio |
| 274 | +from util import delay |
| 275 | + |
| 276 | + |
| 277 | +async def main(): |
| 278 | + task = asyncio.create_task(delay(10)) |
| 279 | + |
| 280 | + try: |
| 281 | + result = await asyncio.wait_for(asyncio.shield(task), 5) # wrap task with shield |
| 282 | + print(result) |
| 283 | + except TimeoutError: |
| 284 | + print("Task took longer than 5 seconds, it will finish soon!") |
| 285 | + result = await task |
| 286 | + print(result) |
| 287 | + |
| 288 | +asyncio.run(main()) |
| 289 | +``` |
| 290 | + |
| 291 | +输出如下 |
| 292 | + |
| 293 | +```text |
| 294 | +sleeping for 10 second(s)! |
| 295 | +Task took longer than 5 seconds, it will finish soon! |
| 296 | +finished sleeping for 10 second(s) |
| 297 | +10 |
| 298 | +``` |
0 commit comments