Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 61 additions & 23 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import collections
import collections.abc
import concurrent.futures
import enum
import errno
import heapq
import itertools
Expand Down Expand Up @@ -272,6 +273,23 @@ async def restore(self):
self._proto.resume_writing()


class _ServerState(enum.Enum):
"""This tracks the state of Server.

-[in]->NOT_STARTED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN

- in: Server.__init__()
- ss: Server._start_serving()
- cl: Server.close()
- wk: Server._wakeup() *only called if number of clients == 0
"""

NOT_STARTED = "not_started"
SERVING = "serving"
CLOSED = "closed"
SHUTDOWN = "shutdown"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the name SHUTDOWN here, but needed something more "definitive" than closed.

If we do use SHUTDOWN should I also rename Server._wakeup -> Server._shutdown?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do use SHUTDOWN should I also rename Server._wakeup -> Server._shutdown?

I think that makes sense.

I'm more worried about INITIALIZED here; nothing is really initialized, other than the Python object, which isn't particularly useful knowledge. Let's call it something like NOT_SERVING or NOT_YET_STARTED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 07129e5



class Server(events.AbstractServer):

def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
Expand All @@ -287,32 +305,47 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
self._ssl_context = ssl_context
self._ssl_handshake_timeout = ssl_handshake_timeout
self._ssl_shutdown_timeout = ssl_shutdown_timeout
self._serving = False
self._state = _ServerState.NOT_STARTED
self._serving_forever_fut = None

def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'

def _attach(self, transport):
assert self._sockets is not None
if self._state != _ServerState.SERVING:
raise RuntimeError("server is not serving, cannot attach transport")
self._clients.add(transport)

def _detach(self, transport):
self._clients.discard(transport)
if len(self._clients) == 0 and self._sockets is None:
self._wakeup()
if self._state == _ServerState.CLOSED and len(self._clients) == 0:
self._shutdown()

def _shutdown(self):
if self._state == _ServerState.CLOSED:
self._state = _ServerState.SHUTDOWN
elif self._state == _ServerState.SHUTDOWN:
# gh109564: the wakeup method has two possible call-sites,
# through an explicit call Server.close(), or indirectly through
# Server._detach() by the last connected client.
return
else:
raise RuntimeError(f"server {self!r} must be closed before shutdown")

def _wakeup(self):
waiters = self._waiters
self._waiters = None
for waiter in waiters:
if not waiter.done():
waiter.set_result(None)

def _start_serving(self):
if self._serving:
if self._state == _ServerState.NOT_STARTED:
self._state = _ServerState.SERVING
elif self._state == _ServerState.SERVING:
return
self._serving = True
else:
raise RuntimeError(f'server {self!r} was already started and then closed')
Copy link
Contributor

@kumaraditya303 kumaraditya303 Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the existing message was clear enough, it would avoid changing tests too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 6c8d3df :)


for sock in self._sockets:
sock.listen(self._backlog)
self._loop._start_serving(
Expand All @@ -324,7 +357,7 @@ def get_loop(self):
return self._loop

def is_serving(self):
return self._serving
return self._state == _ServerState.SERVING

@property
def sockets(self):
Expand All @@ -333,23 +366,30 @@ def sockets(self):
return tuple(trsock.TransportSocket(s) for s in self._sockets)

def close(self):
sockets = self._sockets
if sockets is None:
if self._state in {_ServerState.CLOSED, _ServerState.SHUTDOWN}:
return
self._sockets = None

for sock in sockets:
self._loop._stop_serving(sock)
prev_state = self._state
try:
self._state = _ServerState.CLOSED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if something magically goes wrong after this has been set? Is the server still alive while thinking it's closed? It might be worth adding a try/except to undo this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah great point! Although I'm not sure how that would look from a user's point of view. Should we give them a warning to let them know they must try to close the server again? Alternatively, should we expose something like a force kwarg/flag?

I pushed an initial fix with a simple try/except to recover the previous state on fail in 48a3c0d


self._serving = False
sockets = self._sockets
if sockets is None:
return
self._sockets = None

if (self._serving_forever_fut is not None and
not self._serving_forever_fut.done()):
self._serving_forever_fut.cancel()
self._serving_forever_fut = None
for sock in sockets:
self._loop._stop_serving(sock)

if len(self._clients) == 0:
self._wakeup()
if (self._serving_forever_fut is not None and
not self._serving_forever_fut.done()):
self._serving_forever_fut.cancel()
self._serving_forever_fut = None

if len(self._clients) == 0:
self._shutdown()
except:
self._state = prev_state

def close_clients(self):
for transport in self._clients.copy():
Expand All @@ -369,8 +409,6 @@ async def serve_forever(self):
if self._serving_forever_fut is not None:
raise RuntimeError(
f'server {self!r} is already being awaited on serve_forever()')
if self._sockets is None:
raise RuntimeError(f'server {self!r} is closed')

self._start_serving()
self._serving_forever_fut = self._loop.create_future()
Expand Down Expand Up @@ -407,7 +445,7 @@ async def wait_closed(self):
# from two places: self.close() and self._detach(), but only
# when both conditions have become true. To signal that this
# has happened, self._wakeup() sets self._waiters to None.
if self._waiters is None:
if self._state == _ServerState.SHUTDOWN:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
Expand Down
2 changes: 1 addition & 1 deletion Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._closing = False # Set when close() called.
self._called_connection_lost = False
self._eof_written = False
if self._server is not None:
if self._server is not None and self._server.is_serving():
self._server._attach(self)
self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
Expand Down
3 changes: 2 additions & 1 deletion Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,8 +794,9 @@ def __init__(self, loop, sock, protocol, extra=None, server=None):
self._closing = False # Set when close() called.
self._paused = False # Set when pause_reading() called

if self._server is not None:
if self._server is not None and self._server.is_serving():
self._server._attach(self)

loop._transports[self._sock_fd] = self

def __repr__(self):
Expand Down
10 changes: 8 additions & 2 deletions Lib/test/test_asyncio/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import threading
import unittest
from unittest.mock import Mock

from test.support import socket_helper
from test.test_asyncio import utils as test_utils
Expand Down Expand Up @@ -65,7 +66,7 @@ async def main(srv):
self.assertIsNone(srv._waiters)
self.assertFalse(srv.is_serving())

with self.assertRaisesRegex(RuntimeError, r'is closed'):
with self.assertRaisesRegex(RuntimeError, r'started and then closed'):
self.loop.run_until_complete(srv.serve_forever())


Expand Down Expand Up @@ -118,7 +119,7 @@ async def main(srv):
self.assertIsNone(srv._waiters)
self.assertFalse(srv.is_serving())

with self.assertRaisesRegex(RuntimeError, r'is closed'):
with self.assertRaisesRegex(RuntimeError, r'started and then closed'):
self.loop.run_until_complete(srv.serve_forever())


Expand Down Expand Up @@ -186,6 +187,8 @@ async def serve(rd, wr):
loop.call_soon(srv.close)
loop.call_soon(wr.close)
await srv.wait_closed()
self.assertTrue(task.done())
self.assertFalse(srv.is_serving())

async def test_close_clients(self):
async def serve(rd, wr):
Expand All @@ -212,6 +215,9 @@ async def serve(rd, wr):
await asyncio.sleep(0)
self.assertTrue(task.done())

with self.assertRaisesRegex(RuntimeError, r'started and then closed'):
await srv.start_serving()

async def test_abort_clients(self):
async def serve(rd, wr):
fut.set_result((rd, wr))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix race condition in :meth:`asyncio.Server.close`. Patch by Jamie Phan.
Loading