- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 33.2k
GH-109564: add asyncio.Server state machine #131009
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
0fec860
              f3b96bf
              8e409b7
              a92158a
              44d24fb
              07129e5
              48a3c0d
              5832655
              7f3481b
              f32efd7
              61b502f
              6c8d3df
              d4ceedc
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -16,6 +16,7 @@ | |
| import collections | ||
| import collections.abc | ||
| import concurrent.futures | ||
| import enum | ||
| import errno | ||
| import heapq | ||
| import itertools | ||
|  | @@ -272,6 +273,23 @@ async def restore(self): | |
| self._proto.resume_writing() | ||
|  | ||
|  | ||
| class _ServerState(enum.Enum): | ||
| """This tracks the state of Server. | ||
|  | ||
| -[in]->INITIALIZED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN | ||
|  | ||
| - in: Server.__init__() | ||
| - ss: Server._start_serving() | ||
| - cl: Server.close() | ||
| - wk: Server._wakeup() *only called if number of clients == 0 | ||
| """ | ||
|  | ||
| INITIALIZED = "initialized" | ||
| SERVING = "serving" | ||
| CLOSED = "closed" | ||
| SHUTDOWN = "shutdown" | ||
|  | ||
|  | ||
| class Server(events.AbstractServer): | ||
|  | ||
| def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, | ||
|  | @@ -287,32 +305,49 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, | |
| self._ssl_context = ssl_context | ||
| self._ssl_handshake_timeout = ssl_handshake_timeout | ||
| self._ssl_shutdown_timeout = ssl_shutdown_timeout | ||
| self._serving = False | ||
| self._state = _ServerState.INITIALIZED | ||
| self._serving_forever_fut = None | ||
|  | ||
| def __repr__(self): | ||
| return f'<{self.__class__.__name__} sockets={self.sockets!r}>' | ||
|  | ||
| def _attach(self, transport): | ||
| assert self._sockets is not None | ||
| if self._state != _ServerState.SERVING: | ||
| raise RuntimeError("server is not serving, cannot attach transport") | ||
| self._clients.add(transport) | ||
|  | ||
| def _detach(self, transport): | ||
| self._clients.discard(transport) | ||
| if len(self._clients) == 0 and self._sockets is None: | ||
| if self._state == _ServerState.CLOSED and len(self._clients) == 0: | ||
|         
                  ordinary-jamie marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| self._wakeup() | ||
|  | ||
| def _wakeup(self): | ||
| match self._state: | ||
|          | ||
| case _ServerState.SHUTDOWN: | ||
| # gh109564: the wakeup method has two possible call-sites, | ||
| # through an explicit call Server.close(), or indirectly through | ||
| # Server._detach() by the last connected client. | ||
| return | ||
| case _ServerState.INITIALIZED | _ServerState.SERVING: | ||
| raise RuntimeError("cannot wakeup server before closing") | ||
| case _ServerState.CLOSED: | ||
| self._state = _ServerState.SHUTDOWN | ||
|  | ||
| waiters = self._waiters | ||
| self._waiters = None | ||
| for waiter in waiters: | ||
| if not waiter.done(): | ||
| waiter.set_result(None) | ||
|  | ||
| def _start_serving(self): | ||
| if self._serving: | ||
| return | ||
| self._serving = True | ||
| match self._state: | ||
| case _ServerState.SERVING: | ||
| return | ||
| case _ServerState.CLOSED | _ServerState.SHUTDOWN: | ||
| raise RuntimeError(f'server {self!r} is closed') | ||
| case _ServerState.INITIALIZED: | ||
| self._state = _ServerState.SERVING | ||
|  | ||
| for sock in self._sockets: | ||
| sock.listen(self._backlog) | ||
| self._loop._start_serving( | ||
|  | @@ -324,7 +359,7 @@ def get_loop(self): | |
| return self._loop | ||
|  | ||
| def is_serving(self): | ||
| return self._serving | ||
| return self._state == _ServerState.SERVING | ||
|  | ||
| @property | ||
| def sockets(self): | ||
|  | @@ -333,6 +368,13 @@ def sockets(self): | |
| return tuple(trsock.TransportSocket(s) for s in self._sockets) | ||
|  | ||
| def close(self): | ||
| match self._state: | ||
| case _ServerState.CLOSED | _ServerState.SHUTDOWN: | ||
| # Shutdown state can only be reached after closing. | ||
| return | ||
| case _: | ||
| self._state = _ServerState.CLOSED | ||
|  | ||
| sockets = self._sockets | ||
| if sockets is None: | ||
| return | ||
|  | @@ -341,8 +383,6 @@ def close(self): | |
| for sock in sockets: | ||
| self._loop._stop_serving(sock) | ||
|  | ||
| self._serving = False | ||
|  | ||
| if (self._serving_forever_fut is not None and | ||
| not self._serving_forever_fut.done()): | ||
| self._serving_forever_fut.cancel() | ||
|  | @@ -369,8 +409,6 @@ async def serve_forever(self): | |
| if self._serving_forever_fut is not None: | ||
| raise RuntimeError( | ||
| f'server {self!r} is already being awaited on serve_forever()') | ||
| if self._sockets is None: | ||
| raise RuntimeError(f'server {self!r} is closed') | ||
|  | ||
| self._start_serving() | ||
| self._serving_forever_fut = self._loop.create_future() | ||
|  | @@ -407,7 +445,7 @@ async def wait_closed(self): | |
| # from two places: self.close() and self._detach(), but only | ||
| # when both conditions have become true. To signal that this | ||
| # has happened, self._wakeup() sets self._waiters to None. | ||
| if self._waiters is None: | ||
| if self._state == _ServerState.SHUTDOWN: | ||
| return | ||
| waiter = self._loop.create_future() | ||
| self._waiters.append(waiter) | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -795,7 +795,12 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): | |
| self._paused = False # Set when pause_reading() called | ||
|  | ||
| if self._server is not None: | ||
| self._server._attach(self) | ||
| if self._server.is_serving(): | ||
| self._server._attach(self) | ||
| else: | ||
| self.abort() | ||
| return | ||
|          | ||
|  | ||
| loop._transports[self._sock_fd] = self | ||
|  | ||
| def __repr__(self): | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -4,6 +4,7 @@ | |
| import time | ||
| import threading | ||
| import unittest | ||
| from unittest.mock import Mock | ||
|  | ||
| from test.support import socket_helper | ||
| from test.test_asyncio import utils as test_utils | ||
|  | @@ -186,6 +187,8 @@ async def serve(rd, wr): | |
| loop.call_soon(srv.close) | ||
| loop.call_soon(wr.close) | ||
| await srv.wait_closed() | ||
| self.assertTrue(task.done()) | ||
| self.assertFalse(srv.is_serving()) | ||
|  | ||
| async def test_close_clients(self): | ||
| async def serve(rd, wr): | ||
|  | @@ -212,6 +215,9 @@ async def serve(rd, wr): | |
| await asyncio.sleep(0) | ||
| self.assertTrue(task.done()) | ||
|  | ||
| with self.assertRaisesRegex(RuntimeError, r'is closed'): | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add new tests for the race mentioned, it would be nice to check if this addresses #123720 and similar issue and add regression tests for them | ||
| await srv.start_serving() | ||
|  | ||
| async def test_abort_clients(self): | ||
| async def serve(rd, wr): | ||
| fut.set_result((rd, wr)) | ||
|  | @@ -266,6 +272,29 @@ async def serve(rd, wr): | |
| await asyncio.sleep(0) | ||
| self.assertTrue(task.done()) | ||
|  | ||
| async def test_close_before_transport_attach(self): | ||
| proto = Mock() | ||
| loop = asyncio.get_running_loop() | ||
| srv = await loop.create_server(lambda *_: proto, socket_helper.HOSTv4, 0) | ||
|  | ||
| await srv.start_serving() | ||
| addr = srv.sockets[0].getsockname() | ||
|  | ||
| # Create a connection to the server but close the server before the | ||
| # socket transport for the connection is created and attached | ||
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | ||
| s.connect(addr) | ||
| await asyncio.sleep(0) # loop select reader | ||
| await asyncio.sleep(0) # accept conn 1 | ||
| srv.close() | ||
|  | ||
| # Ensure the protocol is given an opportunity to handle this event | ||
| # gh109564: the transport would be unclosed and will cause a loop | ||
| # exception due to a double-call to Server._wakeup | ||
| await asyncio.sleep(0) | ||
| await asyncio.sleep(0) | ||
| proto.connection_lost.assert_called() | ||
|  | ||
|  | ||
| # Test the various corner cases of Unix server socket removal | ||
| class UnixServerCleanupTests(unittest.IsolatedAsyncioTestCase): | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix race condition in :meth:`asyncio.Server.close`. Patch by Jamie Phan. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about the name
SHUTDOWNhere, but needed something more "definitive" thanclosed.If we do use
SHUTDOWNshould I also renameServer._wakeup->Server._shutdown?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes sense.
I'm more worried about
INITIALIZEDhere; nothing is really initialized, other than the Python object, which isn't particularly useful knowledge. Let's call it something likeNOT_SERVINGorNOT_YET_STARTED.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 07129e5