@@ -440,4 +440,157 @@ while True:
440440然而,如果我们仅使用 selectors 来构建应用程序,就需要自行实现事件循环才能达到与 asyncio 相同的功能。
441441下面介绍如何使用 async/await 来实现上面功能。
442442
443- ### An echo server on the asyncio event loop
443+ ## An echo server on the asyncio event loop
444+
445+ 使用 ` select ` 对于很多应用来说有些太底层了。
446+ 我们可能希望在等待 socket 的时候,让代码在后台运行,或者我们可能希望按计划执行后台任务。
447+ 如果只使用 selectors 来实现这个,我们将需要构建自己的事件循环,与此同时,asyncio 有一个完整的实现可以使用。
448+ 此外,coroutines 和 tasks 在 selectors 之上提供了抽象层,这使得我们代码更易于实现和维护,无需考虑 selectors 细节。
449+
450+ 下面通过 asyncio 的 coroutines 和 tasks 再次实现前面的 echo server。
451+ 这里仍然会通过底层 API 来实现,这些 API 会返回 coroutines 和 tasks。
452+
453+ ### Event loop coroutines for sockets
454+
455+ 考虑到 sockets 的一个相对底层的概念,处理他们的方法是通过 asyncio 的事件循环。
456+ 下面会使用三种主要的协程处理:
457+
458+ - ` sock_accept `
459+ - ` sock_recv `
460+ - ` sock_sendall `
461+
462+ 这些方法和之前的很类似,不同在于这些方法会将 socket 作为一个参数输入,这样我们可以 ` await ` 返回的协程,直到我们得到可以作用于其上的数据。
463+
464+ 下面先从 ` sock_accept ` 开始,这个协程类似之前的 ` socket.accept ` 方法。
465+
466+ 该方法返回一个 tuple: ` (socket_connection, client_address) ` ,传入感兴趣的 socket,然后 ` await ` 等待连接返回。
467+ 一但接受该协程就能获取到连接与地址,这个 socket 必须是非阻塞的,并和一个端口绑定起来:
468+
469+ ``` Python
470+ connection, addresss = await loop.socke_accept(socket)
471+ ```
472+
473+ ` sock_recv ` 和 ` sock_sendall ` 也是类似的,输入一个 socket,然后 ` await ` 等待结果。
474+
475+ - ` sock_recv ` 会等待直到有可以处理的字节
476+ - ` sock_sendall ` 同时接受一个 socket 和要发送的 data,它会等待直到所有数据成功发送至 socket,并在成功后返回 ` None `
477+
478+ ``` Python
479+ data = await loop.sock_recv(socket)
480+ success = await loop.sock_sendall(socket, data)
481+ ```
482+
483+ ### Designing an asyncio echo server
484+
485+ 之前介绍了 coroutines 和 tasks,那么什么时候使用 coroutine,什么时候使用 task 呢?
486+ 让我们来审视一下,我们希望应用程序如何表现以做出这一判断。
487+
488+ 我们从如何监听应用连接开始。
489+ 当监听应用连接的时候,一次将只能处理一个连接,因为 ` socket.accept ` 只会给客户端一个连接。
490+ 如果有多个连接到达,后续的连接会被存储到一个被称作 ` backlog ` 的队列里面。
491+
492+ 由于不需要并发处理多个连接,单个协程循环就足够了。
493+ 这样能够让其他代码在等待连接的时候并发执行。
494+ 这里定义一个一直循环的协程 ` listen_for_connections `
495+
496+ ``` Python
497+ async def listen_for_connections (server_socket : socket, loop : AbstractEventLoop):
498+ while True :
499+ connection, address = await loop.sock_accept(server_socket)
500+ connection.setblocking(False )
501+ print (f " Got a connection from { address} " )
502+ ```
503+
504+ 这样就有了一个监听连接的协程,由于要并发处理多个 connection,因此这里要为每个 connection 创建一个 task 来读写数据。
505+
506+ 这里将创建负责处理数据的协程 ` echo ` ,这个协程会一直循环来接收来自 client 的数据,一但其收到数据,就写会到 client 中去。
507+ 然后在 ` listen_for_connections ` 里,创建一个 task 来包装 ` echo ` 协程。
508+
509+ ``` Python
510+ import asyncio
511+ import socket
512+ from asyncio import AbstractEventLoop
513+
514+
515+ async def echo (connection : socket, loop : AbstractEventLoop) -> None :
516+ while data := await loop.sock_recv(connection, 1024 ):
517+ await loop.sock_sendall(connection, data)
518+
519+ async def listen_for_connections (server_socket : socket, loop : AbstractEventLoop):
520+ while True :
521+ connection, address = await loop.sock_accept(server_socket)
522+ connection.setblocking(False )
523+ print (f " Got a connection from { address} " )
524+
525+ asyncio.create_task(echo(connection, loop))
526+
527+ async def main ():
528+ server_socket = socket.socket(socket.AF_INET , socket.SOCK_STREAM )
529+ server_socket.setsockopt(socket.SOL_SOCKET , socket.SO_REUSEADDR , 1 )
530+
531+ server_address = (" 127.0.0.1" , 8000 )
532+ server_socket.setblocking(False )
533+ server_socket.bind(server_address)
534+ server_socket.listen()
535+
536+ await listen_for_connections(server_socket, asyncio.get_event_loop())
537+
538+ asyncio.run(main())
539+ ```
540+
541+ 架构如下:
542+
543+ - 协程 ` listen_for_connections ` 持续监听连接,收到连接后该协程就会切换到 ` echo ` task 去处理每个连接
544+ - Client 1 echo task <--Read/Write--> Client 1
545+ - Client 2 echo task <--Read/Write--> Client 2
546+ - Client 3 echo task <--Read/Write--> Client 3
547+
548+ 这样设计的 echo server 实际上有一个问题,下面来解决
549+
550+ ### Handling errors in tasks
551+
552+ 网络连接通常都是不可靠的,我们可能得到非预期的报错。
553+ 下面修改 echo 的实现,添加一个错误处理的代码:
554+
555+ ``` Python
556+ async def echo (connection : socket, loop : AbstractEventLoop) -> None :
557+ while data := await loop.recv(connection, 1024 ):
558+ if data == b " boom\r\n " :
559+ raise Exception (" Unexcepted network error" )
560+ await loop.sock_sendall(connection, data)
561+ ```
562+
563+ 现在只要发送 boom 就会导致下面这样的报错:
564+
565+ ``` text
566+ Got a connection from ('127.0.0.1', 49470)
567+ Task exception was never retrieved
568+ future: <Task finished name='Task-2' coro=<echo() done, defined at /Users/starslayerx/GitHub/book_asyncio/asyncio_echo_server.py:4> exception=Exception('Unexcepted network error')>
569+ Traceback (most recent call last):
570+ File "/Users/starslayerx/GitHub/book_asyncio/asyncio_echo_server.py", line 7, in echo
571+ raise Exception("Unexcepted network error")
572+ ```
573+
574+ 这里的重点在于 ` Task exception was never retrieved ` 。
575+ 当一个异常在 task 内部被抛出时,这个任务会被视为已完成,并且它的“结果”就是这个异常。
576+ 这意味着异常不会沿着调用栈向上传递。
577+ 此外,这里没有任何清理逻辑。
578+ 如果该异常被抛出,则无法对任务失败做出反应,因为从未获取 retrieve 这个异常。
579+
580+ 要让异常真正传递,必须在 await 表达式中使用 task。
581+ 当 await 一个失败的 task 时,异常会在 await 的地方重新抛出,其 traceback 也会在该位置体现。
582+ 如果在程序中从未 await 一个 task,就有可能永远看不到这个 task 抛出的异常。
583+
584+ 下面演示,与其忽略在 ` listen_for_connection ` 里面创建的 echo tasks,我们通过列表来跟踪他们
585+
586+ ``` Python
587+ tasks = []
588+ async def listen_for_connection (server_socket : socket, loop : AbstractEventLoop):
589+ while True :
590+ connection, address = await loop.socket_accept(server_socket)
591+ connection.setblocking(False )
592+ print (f " Got a connection from { address} " )
593+ tasks.append(
594+ asyncio.create_task(echo(connection, loop))
595+ )
596+ ```
0 commit comments