|
| 1 | ++++ |
| 2 | +date = '2025-11-19T8:00:00+08:00' |
| 3 | +draft = false |
| 4 | +title = 'Python Asyncio 01: Getting to know asyncio' |
| 5 | +tags = ['Python', 'Asyncio'] |
| 6 | ++++ |
| 7 | + |
| 8 | +## Python asyncio 基础篇 |
| 9 | + |
| 10 | +本篇包含 |
| 11 | + |
| 12 | +- asyncio 是什么以及如何使用它 |
| 13 | +- concurrency 并发、parallelism 并行、threads 线程和 processes 进程 |
| 14 | +- GIL (global interpreter lock) 全局解释器锁和其带来的并发跳转 |
| 15 | +- 非阻塞 sockets 如何只通过一个线程实现并发 |
| 16 | +- 基于事件循环 (event-loop-based) 并发的基本原理 |
| 17 | + |
| 18 | +asynchronous programming 异步编程意思是可以在主程序之外,额外运行一个特定的长时运行的任务。 |
| 19 | + |
| 20 | +一个 coroutine 协程是一种方法,协程是一种方法,当遇到可能长时间运行的任务时,它可以暂停执行,并在任务完成后恢复执行。 |
| 21 | + |
| 22 | +asyncio 这个库的名称可能让人人为其只适合编写 I/O 操作,但实际上该库可以和 multithreading 和 multiprocessing 库结合使用。 |
| 23 | +基于这种 interoperability 互操作性,可以使用 async/await 关键字让工作流更加容易理解。 |
| 24 | +这意味着,asyncio 不仅适合 I/O 的并发,也可以在 CPU 密集操作中使用。 |
| 25 | + |
| 26 | +所谓的 I/O-bound 和 CPU-bound 是指限制程序运行更快的主要因素,这意味着如果增加该方面的性能,程序就能够在更短的时间内完成。 |
| 27 | + |
| 28 | +下面是一些例子 |
| 29 | + |
| 30 | +- I/O 密集操作:网络请求、文件读取 |
| 31 | +- CPU 密集操作:循环遍历文件夹、计算 pi |
| 32 | + |
| 33 | +```Python |
| 34 | +import requests |
| 35 | + |
| 36 | +response = requests.get('https://www.example.com') # 1 |
| 37 | +items = response.headers.items() |
| 38 | +headers = [f'{key}: {headers}' for key, header in items] # 2 |
| 39 | +formatted_headers = '\n'.join(headers) # 3 |
| 40 | +with open('headers.txt', 'w') as file: # 4 |
| 41 | + file.write(formatted_headers) |
| 42 | +``` |
| 43 | + |
| 44 | +1. I/O-bound 网络请求 |
| 45 | +2. CPU-bound 响应处理 |
| 46 | +3. CPU-bound 字符串拼接 |
| 47 | +4. I/O-bound 写入磁盘 |
| 48 | + |
| 49 | +Concurrency 并发 和 Parallelism 并行的区别这里就不多说了。 |
| 50 | + |
| 51 | +Multitasking 分为 preemptive multitasking 抢占式多任务处理 和 cooperative multitasking 协作式多任务处理。 |
| 52 | + |
| 53 | +- Preemptive Multitasking |
| 54 | + |
| 55 | + 在抢占式多任务处理中,通过时间片轮转 (time slicing) 进程,让操作系统来决定执行哪些任务切换。 |
| 56 | + 当操作系统在不同任务之间切换的时候,我们称其为抢占 (preempting)。 |
| 57 | + 该机制底层如何工作取决于操作系统,这主要通过多进程或多线程实现 |
| 58 | + |
| 59 | +- Coorperative Multitasking |
| 60 | + |
| 61 | + 在协作式多任务处理中,不让操作系统决定何时切换任务,而是通过明确的编码来指明何时可以切换到其他任务上。 |
| 62 | + 应用任务运行一个协作模型,显示地说“我要暂停这个任务一会儿,执行别的任务去吧”。 |
| 63 | + |
| 64 | + 协作式的优势:首先协作式资源消耗更低,当操作系统需要在进程或线程间切换的时候,会有一个上下文切换的过程。 |
| 65 | + 第二个是粒度,操作系统切换任务的时间点可能并非最优的,当并发处理的时候就有了更多控制权,可以在正确的时间切换任务。 |
| 66 | + |
| 67 | +### Process 进程 |
| 68 | + |
| 69 | +一个进程有独立的内存空间,且其他应用不能访问。 |
| 70 | +一台机器可以运行多个进程,如果 CPU 是多核的,那么可以同时运行多个进程。 |
| 71 | + |
| 72 | +### Thread 线程 |
| 73 | + |
| 74 | +线程是操作系统的最小管理单元,可以看作是一个轻量的进程。 |
| 75 | +它们没有进程那样的独立内存空间,相反,线程共享进程创建的内存空间。 |
| 76 | + |
| 77 | +线程是和创建他们的进程相关联的,进程至少会有一个线程,一般称为主线程 main thread。 |
| 78 | +进程还可以创建其他线程,一般称为 worker 或 background threads。 |
| 79 | +这些线程可以和主线程并发运行,操作系统也可以在时间片(time slicing)轮转时切换他们。 |
| 80 | + |
| 81 | +当启动一个 Python 应用的时候,我们会创建一个进程和一个主线程来负责运行 Python 应用。 |
| 82 | + |
| 83 | +```Python |
| 84 | +import os |
| 85 | +import threading |
| 86 | + |
| 87 | +print(f"Python process running with process id: {os.getpid()}") |
| 88 | +total_threads = threading.active_count() |
| 89 | +thread_name = threading.current_thread().name |
| 90 | + |
| 91 | +print(f"Python is currently running: {total_threads} thread(s).") |
| 92 | +print(f"The current thread is {thread_name}") |
| 93 | +``` |
| 94 | + |
| 95 | +输出为 |
| 96 | + |
| 97 | +```text |
| 98 | +Python process running with process id: 62973 |
| 99 | +Python is currently running: 1 thread(s). |
| 100 | +The current thread is MainThread |
| 101 | +``` |
| 102 | + |
| 103 | +- 多线程 Python 应用示例 |
| 104 | + |
| 105 | +```Python |
| 106 | +import threading |
| 107 | + |
| 108 | + |
| 109 | +def hello_from_thread(): |
| 110 | + """Print current thread name.""" |
| 111 | + print(f"Hello from thread {threading.current_thread()}!") |
| 112 | + |
| 113 | +# 显示创建的 thread (not main thread) |
| 114 | +hello_thread = threading.Thread(target=hello_from_thread) |
| 115 | +hello_thread.start() # start thread |
| 116 | + |
| 117 | +total_threads = threading.active_count() |
| 118 | +thread_name = threading.current_thread().name |
| 119 | + |
| 120 | +print(f"Python is currently running {total_threads} thread(s)") |
| 121 | +print(f"The current thread is {thread_name}") |
| 122 | + |
| 123 | +hello_thread.join() # pause untill started and completed |
| 124 | +``` |
| 125 | + |
| 126 | +输出为 |
| 127 | + |
| 128 | +```text |
| 129 | +Hello from thread <Thread(Thread-1 (hello_from_thread), started 6111047680)>! |
| 130 | +Python is currently running 2 thread(s) |
| 131 | +The current thread is MainThread |
| 132 | +``` |
| 133 | + |
| 134 | +要注意,运行上面代码可能会看到线程中输出 hello 的内容,并且看到 "Python is currently runnig 2 thread(s)" 在统一行输出两次。 |
| 135 | +这里一种 race condition 竟态条件,多线程是许多编程语言的实现并发的一种方式,但由于 GIL 的限制,python 的多线程只对 I/O-bound 类型有效。 |
| 136 | + |
| 137 | +此外,还可以使用多进程,即 multiprocessing,一个父进程会创建子进程并管理这些进程,然后将工作内存分发给这些子进程。 |
| 138 | +multiprocessing 的 API 类似 multithreading,首先创建一个 target function,然后调用 start 方法执行,最后 join 等待运行完成。 |
| 139 | + |
| 140 | +```Python |
| 141 | +import os |
| 142 | +import multiprocessing |
| 143 | + |
| 144 | + |
| 145 | +def hello_from_process(): |
| 146 | + print(f"Hello from child process {os.getpid()}!") |
| 147 | + |
| 148 | +if __name__ == "__main__": |
| 149 | + hello_process = multiprocessing.Process(target=hello_from_process) |
| 150 | + hello_process.start() |
| 151 | + |
| 152 | + print(f"Hello from parent process {os.getpid()}") |
| 153 | + hello_process.join() |
| 154 | +``` |
| 155 | + |
| 156 | +输出为 |
| 157 | + |
| 158 | +```text |
| 159 | +Hello from parent process 35718 |
| 160 | +Hello from child process 35720! |
| 161 | +``` |
| 162 | + |
| 163 | +多进程尤其适合 CPU 密集任务。 |
| 164 | + |
| 165 | +### Global interpreter lock |
| 166 | + |
| 167 | +全局解释器锁会使得 python 进程只能同时有一个线程运行 python 代码。 |
| 168 | + |
| 169 | +全局解释器存在的原因是由于 CPython 的内存管理方式,在 CPython 中内存主要通过引用计数 (reference counting) 的方式管理。 |
| 170 | +引用计数的工作原理是跟踪当前哪些程序需要访问特定的 Python 对象,如整数、字典或列表。 |
| 171 | + |
| 172 | +这里的冲突是 CPython 多线程不是线程安全的,如果有多余两个线程同时修改同一个变量,则该变量的状态将变得未知。 |
| 173 | +当两个线程访问同一个 Python 对象的时候会产生竟态条件,最终可能导致应用崩溃。 |
| 174 | + |
| 175 | +这是否意味着在 Python 中无法利用多线程的性能优势了呢? |
| 176 | +实际上,对于 I/O 任务仍然可以通过多线程加速(在一些特殊情况下有一些 cpu 密集的例外可以多线程并发),来看下面的示例 |
| 177 | + |
| 178 | +```Python |
| 179 | +import time |
| 180 | +import requests |
| 181 | + |
| 182 | +def read_example() -> None: |
| 183 | + response = resuests.get("https://www.example.com") |
| 184 | + print(response.status_code) |
| 185 | + |
| 186 | +sync_start = time.time() |
| 187 | +read_example() |
| 188 | +read_example() |
| 189 | +sync_end = time.time() |
| 190 | + |
| 191 | +print(f"Running synchronously took {sync_end - sync_start:.4f} seconds.") |
| 192 | +``` |
| 193 | + |
| 194 | +应该会看到下面这样输出 |
| 195 | + |
| 196 | +```text |
| 197 | +200 |
| 198 | +200 |
| 199 | +Running synchronously took 0.2066 seconds. |
| 200 | +``` |
| 201 | + |
| 202 | +下面再编写一个多线程版本对比一下 |
| 203 | + |
| 204 | +```Python |
| 205 | +import time |
| 206 | +import requests |
| 207 | +import threading |
| 208 | + |
| 209 | +def read_example() -> None: |
| 210 | + response = requests.get("https://www.example.com") |
| 211 | + print(response.status_code) |
| 212 | + |
| 213 | +thread_1 = threading.Thread(target=read_example) |
| 214 | +thread_2 = threading.Thread(target=read_example) |
| 215 | + |
| 216 | +thread_start = time.time() |
| 217 | +thread_1.start() |
| 218 | +thread_2.start() |
| 219 | +print("All threads running!") |
| 220 | +thread_end = time.time() |
| 221 | + |
| 222 | +thread_1.join() |
| 223 | +thread_2.join() |
| 224 | + |
| 225 | +thread_end = time.time() |
| 226 | +print(f"Running with threads took {thread_end - thread_start:.4f} seconds.") |
| 227 | +``` |
| 228 | + |
| 229 | +输出为 |
| 230 | + |
| 231 | +```text |
| 232 | +All threads running! |
| 233 | +200 |
| 234 | +200 |
| 235 | +Running with threads took 0.0917 seconds. |
| 236 | +``` |
| 237 | + |
| 238 | +这几乎有两倍的差距! |
| 239 | +答案藏在系统后台调用中,在这种情况下 I/O 会释放锁,因为网络请求是在操作系统层处理的。 |
| 240 | +这时,只有当收到数据返回为 Python 对象的时候才会重新获取锁。 |
| 241 | +换句话说,在 Java 或 C++ 这样语言中,这种情况下会并行执行,然而 Python 由于 GIL 的限制只能进行 I/O 类型的并发,同一时刻只能有一份 Python 代码在执行。 |
| 242 | + |
| 243 | +### asyncio and the GIL |
| 244 | + |
| 245 | +asyncio 利用 I/O 操作会释放 GIL 的特性,使得即使只有单个线程也能够实现并发。 |
| 246 | +当使用 asyncio 的时候会创建叫 coroutines 协程的对象,一个协程可以被看成是一个轻量级线程。 |
| 247 | + |
| 248 | +这意味着 asyncio 并非绕过了 GIl,而是仍然受其限制。 |
| 249 | +如果我们有一个 cpu-bound 任务,我们仍然需要多进程来并发执行。 |
| 250 | + |
| 251 | +### How single-threaded concurrency works |
| 252 | + |
| 253 | +之所以可以实现单线程并发是因为在系统层面,I/O 操作可以被并发完成。 |
| 254 | +为了更好理解,先需要弄懂 sockets 如何工作,有其实非阻塞套接字 non-blocking sockets。 |
| 255 | + |
| 256 | +socket 本质上网络发送和接收数据的是底层抽象,这是数据在服务器之前传输的基础。 |
| 257 | +sockets 支持两个简本的操作:发送字节和接收字节。 |
| 258 | +可以将 sockets 类比为邮箱,你可以向邮箱中放入信封,收件人打开邮箱和你的邮件,根据右键里面内容,收件人可能会给你回信。 |
| 259 | + |
| 260 | +sockets 默认是阻塞的,简单来说,这意味着当我们在等待服务器回复数据的时候,会暂停当前应用或阻塞直到收到数据。 |
| 261 | +但在操作系统层面,sockets 可以在非阻塞模式下运行,并可以执行即发即弃的读写操作,程序能够继续执行其他任务。 |
| 262 | +然后,会在操作系统层收到并处理字节。不是阻塞和等待数据返回,而是通过更加响应式的方式,让操作系统数据准备好后通知我们。 |
| 263 | + |
| 264 | +在后台,这时通过不同的通知系统实现的,根据操作系统会有不同: |
| 265 | + |
| 266 | +- kqueue - FreeBSD and MacOS |
| 267 | +- epool - Linux |
| 268 | +- IOCP (I/O complection port) - Windows |
| 269 | + |
| 270 | +这些通知系统是 asyncio 实现的基础,Python 中一个最基础的事件循环 event loop 类似下面这样: |
| 271 | + |
| 272 | +```Python |
| 273 | +from collections import deque |
| 274 | + |
| 275 | +messages = deque() |
| 276 | +while True: |
| 277 | + if messages: |
| 278 | + message = message.pop() |
| 279 | + process_message(message) |
| 280 | +``` |
| 281 | + |
| 282 | +在 asyncio 中,事件循环里面报错的是 tasks 任务,而非消息。 |
| 283 | +当创建事件循环的时候,会创建一个空的任务队列。 |
| 284 | +事件循环的每一次迭代都会检查需要执行的任务,并逐个运行它们,直到某个任务遇到 I/O 操作。 |
| 285 | +这时候任务会被暂停,并且我们指示操作系统监控套接字,等待 I/O 完成,然后去运行下一个任务。 |
| 286 | +在每一次事件循环中,检查是否有任何 I/O 完成了。 |
0 commit comments