Skip to content

Commit f3d55dd

Browse files
committed
post: python asyncio 03
1 parent 88ede3a commit f3d55dd

File tree

32 files changed

+2210
-144
lines changed

32 files changed

+2210
-144
lines changed

content/posts/2025-12-01_python-asyncio-03.md

Lines changed: 282 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
+++
22
date = '2025-12-01T8:00:00+08:00'
3-
draft = true
3+
draft = false
44
title = 'Python asyncio 03: A first asyncio application'
55
tags = ['Python', 'Asyncio']
66
+++
@@ -594,3 +594,284 @@ async def listen_for_connection(server_socket: socket, loop: AbstractEventLoop):
594594
asyncio.create_task(echo(connection, loop))
595595
)
596596
```
597+
598+
看上去可能会和之前一样,如果输入 boom 应该会看到警告被打印出来,同时伴随着从未获取任务异常的警告。
599+
然而,实际上并不是这样,除非强制终止程序,否则什么都看不到。
600+
601+
这样因为我们保留了 task 的引用,asyncio 只有在任务被垃圾回收的之后才能打印出 traceback 或 failed task 相关信息。
602+
这是因为无法判断该任务是否会在应用程序的其他某个时刻被等待,从而可能引发异常。
603+
鉴于这种复杂性,要么 await tasks,要么处理所有可能的异常。
604+
605+
在 echo server 中,首先可以做的是使用 try/catch 语法,记录下异常并关闭连接:
606+
607+
```Python
608+
import logging
609+
610+
async def echo(connection: socket, loop: AbstractEventLoop) -> None:
611+
try:
612+
while data := await loop.sock_recv(connection, 1024):
613+
print("got data!")
614+
if data == b"boom\r\n":
615+
raise Exception("Unexcepted network error")
616+
await loop.sock_sendall(connection, data)
617+
except Exception as ex:
618+
logging.exception(ex)
619+
finally:
620+
connection.close()
621+
```
622+
623+
这将解决因异常导致的直接问题,即服务器会报错任务异常未被捕获,因为我们在协程内部已自行处理。
624+
这也将在 finally 块中正确关闭 socket,因此不会留下一个未关闭的悬空异常。
625+
626+
该实现会在应用程序关闭时,正确关闭所有已建立的客户端连接。
627+
在之前章节说过,asyncio.run 会取消未完成的 tasks。
628+
如果 await 这个 task 则会产生一个 CancelledError。
629+
630+
这里的重点是要注意异常是在何处产生的。
631+
如果你的任务是在等待一个类似 `await loop.sock_recv` 的语句,并且取消那个 task,这行就会产生一个 `CancelledError`
632+
这意味着对于上述代码,在任务被取消时 finally 块依然会执行,因为取消任务时我们就在 await 语句上抛出了一个异常。
633+
如果我们修改 except 代码,使其捕获并记录 CancelledError,你将会看到每个任务都会产生一个 CancelledError。
634+
635+
上面已经解决了 echo tasks 失败的情况,那如果应用程序关闭后要如何处理这些失败的 tasks 呢?
636+
下面介绍 asyncio signal handlers
637+
638+
## Shutting down gracefully
639+
640+
如果我们想要关闭我们的应用程序怎么办?
641+
我们可以向应用添加自定义关闭逻辑,任何还在运行中的 tasks 都能在几秒内发送需要的信息。
642+
643+
> 本篇内容不适用 Windows 系统:
644+
> [https:// stackoverflow.com/questions/35772001](https:// stackoverflow.com/questions/35772001)
645+
646+
### Listening for signals
647+
648+
Singals 信号是一个 Unix 系统的概念。
649+
例如使用 Ctrl-C 来关闭一个命令行工具就是发送了信号 SIGINT (singal interrupt),这和在 Python 中捕获 KeyboardInterput 异常一样。
650+
另一个常见的信号是 SIGTERM,这是使用 kill 命令杀死某个进程的信号。
651+
652+
为了实现自定义关闭逻辑,将会监听 SIGINT 和 SIGTERM 信号。
653+
并在监听器中实现允许任何 echo tasks 能够在几秒内完成。
654+
655+
asyncio 事件循环允许我们通过 `add_signal_handler` 方法直接监听任何事件。
656+
这与 signal 模块中的 `signal.signal` 函数设置的信号处理器不同。
657+
`add_signal_handler` 能够安全地与事件循环交互。
658+
该函数接收一个信号和一个函数,当监听到相应信号后会调用传入的函数。
659+
660+
下面编写一个 signal handler 来取消所有运行中的 tasks。
661+
asyncio 有个方便的 tasks 叫做 `asyncio.all_tasks`
662+
663+
```Python
664+
# Adding a signal handler to cancel all tasks
665+
import asyncio
666+
import signal
667+
from asyncio import AbstractEventLoop
668+
669+
670+
from util.delay_functions import delay
671+
672+
673+
def cancel_tasks():
674+
print("Got a SIGNAL!")
675+
tasks: set[asyncio.Task] = asyncio.all_tasks()
676+
print(f"Cancelling {len(tasks)} task(s).")
677+
[task.cancel() for task in tasks]
678+
679+
async def main():
680+
loop: AbstractEventLoop = asyncio.get_running_loop()
681+
loop.add_signal_handler(signal.SIGINT, cancel_tasks)
682+
await delay(10)
683+
684+
asyncio.run(main())
685+
```
686+
687+
从 Python 3.11 版本开始,`asyncio.run()` 的设计趋于严格,不再允许 `asyncio.run()` 管理的事件训练里添加信号处理器。
688+
689+
因此不同版本行为会不一致,上面代码在 Python 3.12 需要这样写
690+
691+
```Python
692+
import asyncio
693+
import signal
694+
695+
from util.delay_functions import delay
696+
697+
698+
def cancel_tasks():
699+
print("Got a Signal!")
700+
for task in asyncio.all_tasks():
701+
task.cancel()
702+
703+
async def main():
704+
try:
705+
await delay(10)
706+
except asyncio.CancelledError:
707+
print("Main task cancelled.")
708+
709+
loop = asyncio.new_event_loop()
710+
asyncio.set_event_loop(loop)
711+
loop.add_signal_handler(signal.SIGINT, cancel_tasks)
712+
loop.run_until_complete(main())
713+
loop.close()
714+
```
715+
716+
### Waiting for pending tasks ot finish
717+
718+
在原始的问题里,希望 echo server 的 tasks 能够在关闭前再运行几秒钟。
719+
一种实现方式就是使用 `wait_for` 包装并 await 所有的 tasks。
720+
如果任务超时,则会产生一个 TimeoutError,之后就可以终止程序。
721+
722+
在 shutdown handler 里的一个问题是,这是一个普通的函数,无法在其内部 await。
723+
一种解决方法就是创建一个协程来处理关闭逻辑,并将其包装成一个 task:
724+
725+
```
726+
async def await_all_tasks():
727+
tasks = asyncio.all_tasks()
728+
[await task for task in tasks]
729+
730+
async def main():
731+
loop = asyncio.get_event_loop()
732+
# Warp into a task
733+
loop.add_signal_handler(singal.SIGINT, lambda: asyncio.create_task(await_all_tasks()))
734+
```
735+
736+
这样虽然能工作,但问题是如果 await_all_tasks 内触发了一个异常,将会产生一个孤儿 task,并抛出 "exception was never retrieved" (异常未捕获)的警告,可能隐藏潜在的错误。
737+
738+
我们可以通过抛出一个自定义异常来停止主协程的运行,从而处理这个问题。
739+
这样,当运行主协程时,我们可以捕获这个异常并执行任何关闭逻辑。
740+
要实现这个功能,需要我们自定义事件循环,而不是使用 asyncio.run。
741+
这是因为在 asyncio.run 中会取消所有在运行中的 tasks,这意味着我们不能将 echo tasks 给 wrap 包装进 `wait_for` 中:
742+
743+
```Python
744+
class GracefulExit(SystemExit):
745+
pass
746+
747+
def shutdown():
748+
raise GracefulExit
749+
750+
loop = asyncio.get_event_loop()
751+
loop.add_signal_handler(signal.SIGINT, shutdown)
752+
753+
try:
754+
loop.run_until_complete(main())
755+
except GracefulExit:
756+
loop.run_until_complete(close_echo_tasks(echo_tasks))
757+
finally:
758+
loop.close()
759+
```
760+
761+
顺着上面的思路来编写关闭逻辑
762+
763+
```Python
764+
async def close_echo_tasks(echo_tasks: list[asyncio.Task]):
765+
waiters = [asyncio.wait_for(task, 2) for task in echo_tasks]
766+
for task in waiters:
767+
try:
768+
await task
769+
except asyncio.exceptions.TimeoutError:
770+
# Except a timeout error here
771+
pass
772+
```
773+
774+
`close_echo_tasks` 中,我们使用一个 echo tasks 的列表,并将其包装到 `wait_for` task 里,设置 2 秒的超时时间。
775+
这意味着,在我们取消这些任务前,他们将会有 2 秒时间结束任务。
776+
我们捕获在这两秒任务中任何的 TimeoutErrors。
777+
778+
结合上面说的所有逻辑,echo server 的关闭逻辑总体如下:
779+
780+
```Python
781+
import asyncio
782+
from asyncio import AbstractEventLoop
783+
import socket
784+
import logging
785+
import signal
786+
787+
788+
async def echo(connection: socket.socket, loop: AbstractEventLoop) -> None:
789+
try:
790+
while data := await loop.sock_recv(connection, 1024):
791+
print("got data!")
792+
if data == b"boom\r\n":
793+
raise Exception("Unexpected network error")
794+
await loop.sock_sendall(connection, data)
795+
except Exception as ex:
796+
logging.exception(ex)
797+
finally:
798+
connection.close()
799+
800+
echo_tasks = []
801+
802+
async def connection_listener(server_socket, loop):
803+
while True:
804+
connection, address = await loop.sock_accept(server_socket)
805+
connection.setblocking(False)
806+
print(f"Got a connection from {address}")
807+
echo_task = asyncio.create_task(echo(connection, loop))
808+
echo_tasks.append(echo_task)
809+
810+
class GracefulExit(SystemExit):
811+
pass
812+
813+
def shutdown():
814+
raise GracefulExit()
815+
816+
async def close_echo_tasks(echo_tasks: list[asyncio.Task]):
817+
waiters = [asyncio.wait_for(task, 2) for task in echo_tasks]
818+
# 这里实际上会串行等待
819+
for task in waiters:
820+
try:
821+
await task
822+
except asyncio.exceptions.TimeoutError:
823+
pass
824+
# 并行等待
825+
# results = await asyncio.gather(*waiters, return_exceptions=True)
826+
827+
async def main():
828+
server_socket = socket.socket()
829+
try:
830+
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
831+
832+
server_address = ("127.0.0.1", 8002)
833+
server_socket.setblocking(False)
834+
server_socket.bind(server_address)
835+
server_socket.listen()
836+
837+
for signame in {"SIGINT", "SIGTERM"}:
838+
loop.add_signal_handler(getattr(signal, signame), shutdown)
839+
await connection_listener(server_socket, loop)
840+
finally:
841+
server_socket.close()
842+
843+
loop = asyncio.new_event_loop()
844+
845+
try:
846+
loop.run_until_complete(main())
847+
except GracefulExit:
848+
loop.run_until_complete(close_echo_tasks(echo_tasks))
849+
finally:
850+
loop.close()
851+
```
852+
853+
这段代码是一个支持优雅关闭 Graceful Shutdown 的异步 TCP Echo Server。
854+
855+
核心思想:当收到关闭信号时,不是立刻终止,而是给正在处理的连接一些时间来完成工作。
856+
857+
- `echo`: Echo 处理函数,接收数据原样返回,当接收到 "boom\r\n" 时模拟网络错误
858+
- `cancel_listener`: 连接管理器,异步等待新连接,为每个连接创建一个 task
859+
- `shutdown`: 优雅关闭机制,通过自定义异常实现优雅关闭
860+
- `close_echo_tasks`: 任务清理函数,给每个任务最多 2 秒时间完成,超时后捕获 TimeoutError 并忽略
861+
- `main`: 主函数,允许端口重用,并注册关闭信号
862+
863+
现在通过 `telnet localhost 8002` 发送连接后,按 Ctrl-C 会等待两秒才关闭。
864+
865+
但这段代码并不适合生产环境:
866+
867+
首先是在等待 echo tasks 完成的时候,我们不会关闭 connection_listener。
868+
即在关闭的时候,可能会有一个新的请求过来,这个新请求将无法被加入 2 秒的优雅关闭逻辑中。
869+
870+
另一个原因是,每个 echo task 的关闭逻辑只会捕获 TimeoutExceptions。
871+
这意味着,如果我们的某个任务抛出了其他类型的异常,我们将捕获该异常,而其他后续任务中可能出现的任何异常都将被忽略。
872+
873+
## Summary
874+
875+
在本章,展示了如何使用 blocking 和 non-blocking sockets,并更加深入了 asyncio 事件循环。
876+
并使用 asyncio 创建了一个高并发的 echo server。
877+
并展示了如何在 tasks 中处理错误,并添加自定义的关闭逻辑。

0 commit comments

Comments
 (0)